diff --git a/pkg/sql/delayed_mysql.go b/pkg/sql/delayed_mysql.go index a83f290..efa31b0 100644 --- a/pkg/sql/delayed_mysql.go +++ b/pkg/sql/delayed_mysql.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/components/delay" @@ -87,7 +88,7 @@ func (c *DelayedMySQLSubscriberConfig) setDefaults() { func NewDelayedMySQLSubscriber(db Beginner, config DelayedMySQLSubscriberConfig) (message.Subscriber, error) { config.setDefaults() - where := "delayed_until <= NOW()" + where := "delayed_until <= UTC_TIMESTAMP()" if config.AllowNoDelay { where += " OR delayed_until IS NULL" @@ -138,7 +139,7 @@ func (a delayedMySQLSchemaAdapter) SchemaInitializingQueries(params SchemaInitia ` + "`acked`" + ` BOOLEAN NOT NULL DEFAULT FALSE, ` + "`created_at`" + ` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, ` + "`delayed_until`" + ` TIMESTAMP NULL DEFAULT NULL, - INDEX ` + "`delayed_until_idx`" + ` (` + "`delayed_until`" + `) + INDEX ` + "`idx_acked_delayed`" + ` (` + "`acked`" + `, ` + "`delayed_until`" + `) ); ` @@ -186,11 +187,11 @@ func delayedMySQLInsertArgs(msgs message.Messages) ([]any, error) { if delayedUntilStr == "" { args = append(args, nil) } else { - // Convert ISO 8601 to MySQL TIMESTAMP format: "2025-10-22T09:58:00Z" -> "2025-10-22 09:58:00" - delayedUntilStr = strings.Replace(delayedUntilStr, "T", " ", 1) - delayedUntilStr = strings.TrimSuffix(delayedUntilStr, "Z") - - args = append(args, delayedUntilStr) + delayedUntil, err := time.Parse(time.RFC3339, delayedUntilStr) + if err != nil { + return nil, fmt.Errorf("could not parse delayed_until timestamp %s: %w", delayedUntilStr, err) + } + args = append(args, delayedUntil) } } diff --git a/pkg/sql/delayed_requeuer.go b/pkg/sql/delayed_requeuer.go index b2b481d..86c6ff9 100644 --- a/pkg/sql/delayed_requeuer.go +++ b/pkg/sql/delayed_requeuer.go @@ -136,3 +136,50 @@ func NewPostgreSQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeue requeuer: requeuer, }, nil } + +// NewMySQLDelayedRequeuer creates a new DelayedRequeuer that uses MySQL as a storage. +func NewMySQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeuer, error) { + config.setDefaults() + err := config.Validate() + if err != nil { + return nil, err + } + + publisher, err := NewDelayedMySQLPublisher(config.DB, DelayedMySQLPublisherConfig{ + Logger: config.Logger, + }) + if err != nil { + return nil, err + } + + subscriber, err := NewDelayedMySQLSubscriber(config.DB, DelayedMySQLSubscriberConfig{ + DeleteOnAck: true, + Logger: config.Logger, + }) + if err != nil { + return nil, err + } + + poisonQueue, err := middleware.PoisonQueue(publisher, config.RequeueTopic) + if err != nil { + return nil, err + } + + requeuer, err := requeuer.NewRequeuer(requeuer.Config{ + Subscriber: subscriber, + SubscribeTopic: config.RequeueTopic, + Publisher: config.Publisher, + GeneratePublishTopic: config.GeneratePublishTopic, + }, config.Logger) + if err != nil { + return nil, err + } + + return &DelayedRequeuer{ + middleware: []message.HandlerMiddleware{ + poisonQueue, + config.DelayOnError.Middleware, + }, + requeuer: requeuer, + }, nil +} diff --git a/pkg/sql/delayed_requeuer_test.go b/pkg/sql/delayed_requeuer_test.go index da279d0..3f8b6c4 100644 --- a/pkg/sql/delayed_requeuer_test.go +++ b/pkg/sql/delayed_requeuer_test.go @@ -14,7 +14,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -func TestDelayedRequeuer(t *testing.T) { +func TestPostgreSQLDelayedRequeuer(t *testing.T) { t.Parallel() db := newPostgreSQL(t) @@ -81,3 +81,72 @@ func TestDelayedRequeuer(t *testing.T) { assert.Equal(t, []string{"1", "3"}, receivedMessages) }, 1*time.Second, 100*time.Millisecond) } + +func TestMySQLDelayedRequeuer(t *testing.T) { + t.Parallel() + + db := newMySQL(t) + schemaAdapter := sql.DefaultMySQLSchema{} + offsetsAdapter := sql.DefaultMySQLOffsetsAdapter{} + publisher, subscriber := newPubSub(t, db, "test", schemaAdapter, offsetsAdapter) + + topic := watermill.NewUUID() + + err := subscriber.(message.SubscribeInitializer).SubscribeInitialize(topic) + require.NoError(t, err) + + delayedRequeuer, err := sql.NewMySQLDelayedRequeuer(sql.DelayedRequeuerConfig{ + DB: db, + RequeueTopic: watermill.NewUUID(), + Publisher: publisher, + Logger: logger, + }) + require.NoError(t, err) + + router := message.NewDefaultRouter(logger) + router.AddMiddleware(delayedRequeuer.Middleware()...) + + var receivedMessages []string + + router.AddNoPublisherHandler( + "test", + topic, + subscriber, + func(msg *message.Message) error { + payload := string(msg.Payload) + // MySQL and PostgreSQL format JSON with spaces, so we need to check both variants + if payload == `{"error":true}` || payload == `{"error": true}` { + return fmt.Errorf("error") + } + + receivedMessages = append(receivedMessages, msg.UUID) + + return nil + }, + ) + + go func() { + err := router.Run(context.Background()) + require.NoError(t, err) + }() + + <-router.Running() + + go func() { + err := delayedRequeuer.Run(context.Background()) + require.NoError(t, err) + }() + + err = publisher.Publish(topic, message.NewMessage("1", []byte(`{}`))) + require.NoError(t, err) + + err = publisher.Publish(topic, message.NewMessage("2", []byte(`{"error":true}`))) + require.NoError(t, err) + + err = publisher.Publish(topic, message.NewMessage("3", []byte(`{}`))) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, []string{"1", "3"}, receivedMessages) + }, 1*time.Second, 100*time.Millisecond) +}