From a69fc191280dec01235be4d7e7816343527d8d27 Mon Sep 17 00:00:00 2001 From: "adrian.zajkowski" Date: Mon, 23 Mar 2026 11:49:45 +0100 Subject: [PATCH 1/3] feat: add InitializeSchema option to delayed publisher/subscriber configs This allows disabling automatic schema initialization for environments where the database user doesn't have CREATE TABLE permissions. --- pkg/sql/delayed_mysql.go | 10 ++++++++-- pkg/sql/delayed_postgresql.go | 10 ++++++++-- pkg/sql/delayed_requeuer.go | 19 +++++++++++++------ 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/pkg/sql/delayed_mysql.go b/pkg/sql/delayed_mysql.go index efa31b0..89e6ce7 100644 --- a/pkg/sql/delayed_mysql.go +++ b/pkg/sql/delayed_mysql.go @@ -19,6 +19,9 @@ type DelayedMySQLPublisherConfig struct { OverridePublisherConfig func(config *PublisherConfig) error Logger watermill.LoggerAdapter + + // InitializeSchema option enables initializing schema on making subscription. + InitializeSchema bool } func (c *DelayedMySQLPublisherConfig) setDefaults() { @@ -36,7 +39,7 @@ func NewDelayedMySQLPublisher(db ContextExecutor, config DelayedMySQLPublisherCo SchemaAdapter: delayedMySQLSchemaAdapter{ MySQLQueueSchema: MySQLQueueSchema{}, }, - AutoInitializeSchema: true, + AutoInitializeSchema: config.InitializeSchema, } if config.OverridePublisherConfig != nil { @@ -75,6 +78,9 @@ type DelayedMySQLSubscriberConfig struct { AllowNoDelay bool Logger watermill.LoggerAdapter + + // InitializeSchema option enables initializing schema on making subscription. + InitializeSchema bool } func (c *DelayedMySQLSubscriberConfig) setDefaults() { @@ -107,7 +113,7 @@ func NewDelayedMySQLSubscriber(db Beginner, config DelayedMySQLSubscriberConfig) OffsetsAdapter: MySQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: true, + InitializeSchema: config.InitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_postgresql.go b/pkg/sql/delayed_postgresql.go index 9428969..3107930 100644 --- a/pkg/sql/delayed_postgresql.go +++ b/pkg/sql/delayed_postgresql.go @@ -17,6 +17,9 @@ type DelayedPostgreSQLPublisherConfig struct { OverridePublisherConfig func(config *PublisherConfig) error Logger watermill.LoggerAdapter + + // InitializeSchema option enables initializing schema on making subscription. + InitializeSchema bool } func (c *DelayedPostgreSQLPublisherConfig) setDefaults() { @@ -32,7 +35,7 @@ func NewDelayedPostgreSQLPublisher(db ContextExecutor, config DelayedPostgreSQLP publisherConfig := PublisherConfig{ SchemaAdapter: PostgreSQLQueueSchema{}, - AutoInitializeSchema: true, + AutoInitializeSchema: config.InitializeSchema, } if config.OverridePublisherConfig != nil { @@ -71,6 +74,9 @@ type DelayedPostgreSQLSubscriberConfig struct { AllowNoDelay bool Logger watermill.LoggerAdapter + + // InitializeSchema option enables initializing schema on making subscription. + InitializeSchema bool } func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() { @@ -103,7 +109,7 @@ func NewDelayedPostgreSQLSubscriber(db Beginner, config DelayedPostgreSQLSubscri OffsetsAdapter: PostgreSQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: true, + InitializeSchema: config.InitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_requeuer.go b/pkg/sql/delayed_requeuer.go index 86c6ff9..273eca3 100644 --- a/pkg/sql/delayed_requeuer.go +++ b/pkg/sql/delayed_requeuer.go @@ -48,6 +48,9 @@ type DelayedRequeuerConfig struct { DelayOnError *middleware.DelayOnError Logger watermill.LoggerAdapter + + // InitializeSchema option enables initializing schema on making subscription. + InitializeSchema bool } func (c *DelayedRequeuerConfig) setDefaults() { @@ -99,15 +102,17 @@ func NewPostgreSQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeue } publisher, err := NewDelayedPostgreSQLPublisher(config.DB, DelayedPostgreSQLPublisherConfig{ - Logger: config.Logger, + Logger: config.Logger, + InitializeSchema: config.InitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedPostgreSQLSubscriber(config.DB, DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, + DeleteOnAck: true, + Logger: config.Logger, + InitializeSchema: config.InitializeSchema, }) if err != nil { return nil, err @@ -146,15 +151,17 @@ func NewMySQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeuer, er } publisher, err := NewDelayedMySQLPublisher(config.DB, DelayedMySQLPublisherConfig{ - Logger: config.Logger, + Logger: config.Logger, + InitializeSchema: config.InitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedMySQLSubscriber(config.DB, DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, + DeleteOnAck: true, + Logger: config.Logger, + InitializeSchema: config.InitializeSchema, }) if err != nil { return nil, err From 931a6db46dfc9eb86cdcca570e71bcce48834bfc Mon Sep 17 00:00:00 2001 From: "adrian.zajkowski" Date: Mon, 23 Mar 2026 12:03:12 +0100 Subject: [PATCH 2/3] corrected tests --- pkg/sql/delayed_mysql_test.go | 23 ++++++++++++++--------- pkg/sql/delayed_postgresql_test.go | 23 ++++++++++++++--------- pkg/sql/delayed_requeuer_test.go | 18 ++++++++++-------- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/pkg/sql/delayed_mysql_test.go b/pkg/sql/delayed_mysql_test.go index ce3f6e1..d9d3c3c 100644 --- a/pkg/sql/delayed_mysql_test.go +++ b/pkg/sql/delayed_mysql_test.go @@ -25,13 +25,15 @@ func TestDelayedMySQL(t *testing.T) { return delay.For(time.Second), nil }, }, - Logger: logger, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, + DeleteOnAck: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -71,7 +73,8 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { DelayPublisherConfig: delay.PublisherConfig{ AllowNoDelay: true, }, - Logger: logger, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -79,8 +82,9 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, + DeleteOnAck: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -105,9 +109,10 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - AllowNoDelay: true, - Logger: logger, + DeleteOnAck: true, + AllowNoDelay: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) diff --git a/pkg/sql/delayed_postgresql_test.go b/pkg/sql/delayed_postgresql_test.go index 0bd46b5..448d33c 100644 --- a/pkg/sql/delayed_postgresql_test.go +++ b/pkg/sql/delayed_postgresql_test.go @@ -25,13 +25,15 @@ func TestDelayedPostgreSQL(t *testing.T) { return delay.For(time.Second), nil }, }, - Logger: logger, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, + DeleteOnAck: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -71,7 +73,8 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { DelayPublisherConfig: delay.PublisherConfig{ AllowNoDelay: true, }, - Logger: logger, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -79,8 +82,9 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, + DeleteOnAck: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -105,9 +109,10 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - AllowNoDelay: true, - Logger: logger, + DeleteOnAck: true, + AllowNoDelay: true, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) diff --git a/pkg/sql/delayed_requeuer_test.go b/pkg/sql/delayed_requeuer_test.go index 3f8b6c4..208380c 100644 --- a/pkg/sql/delayed_requeuer_test.go +++ b/pkg/sql/delayed_requeuer_test.go @@ -28,10 +28,11 @@ func TestPostgreSQLDelayedRequeuer(t *testing.T) { require.NoError(t, err) delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{ - DB: db, - RequeueTopic: watermill.NewUUID(), - Publisher: publisher, - Logger: logger, + DB: db, + RequeueTopic: watermill.NewUUID(), + Publisher: publisher, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) @@ -96,10 +97,11 @@ func TestMySQLDelayedRequeuer(t *testing.T) { require.NoError(t, err) delayedRequeuer, err := sql.NewMySQLDelayedRequeuer(sql.DelayedRequeuerConfig{ - DB: db, - RequeueTopic: watermill.NewUUID(), - Publisher: publisher, - Logger: logger, + DB: db, + RequeueTopic: watermill.NewUUID(), + Publisher: publisher, + Logger: logger, + InitializeSchema: true, }) require.NoError(t, err) From 745299de91a8305990189d87ab4133105a9ea7f3 Mon Sep 17 00:00:00 2001 From: "adrian.zajkowski" Date: Mon, 23 Mar 2026 12:44:56 +0100 Subject: [PATCH 3/3] changed from InitializeSchema to DisableInitializeSchema --- pkg/sql/delayed_mysql.go | 12 ++++++------ pkg/sql/delayed_mysql_test.go | 23 +++++++++-------------- pkg/sql/delayed_postgresql.go | 12 ++++++------ pkg/sql/delayed_postgresql_test.go | 23 +++++++++-------------- pkg/sql/delayed_requeuer.go | 24 ++++++++++++------------ pkg/sql/delayed_requeuer_test.go | 18 ++++++++---------- 6 files changed, 50 insertions(+), 62 deletions(-) diff --git a/pkg/sql/delayed_mysql.go b/pkg/sql/delayed_mysql.go index 89e6ce7..4174b71 100644 --- a/pkg/sql/delayed_mysql.go +++ b/pkg/sql/delayed_mysql.go @@ -20,8 +20,8 @@ type DelayedMySQLPublisherConfig struct { Logger watermill.LoggerAdapter - // InitializeSchema option enables initializing schema on making subscription. - InitializeSchema bool + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedMySQLPublisherConfig) setDefaults() { @@ -39,7 +39,7 @@ func NewDelayedMySQLPublisher(db ContextExecutor, config DelayedMySQLPublisherCo SchemaAdapter: delayedMySQLSchemaAdapter{ MySQLQueueSchema: MySQLQueueSchema{}, }, - AutoInitializeSchema: config.InitializeSchema, + AutoInitializeSchema: !config.DisableInitializeSchema, } if config.OverridePublisherConfig != nil { @@ -79,8 +79,8 @@ type DelayedMySQLSubscriberConfig struct { Logger watermill.LoggerAdapter - // InitializeSchema option enables initializing schema on making subscription. - InitializeSchema bool + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedMySQLSubscriberConfig) setDefaults() { @@ -113,7 +113,7 @@ func NewDelayedMySQLSubscriber(db Beginner, config DelayedMySQLSubscriberConfig) OffsetsAdapter: MySQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: config.InitializeSchema, + InitializeSchema: !config.DisableInitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_mysql_test.go b/pkg/sql/delayed_mysql_test.go index d9d3c3c..ce3f6e1 100644 --- a/pkg/sql/delayed_mysql_test.go +++ b/pkg/sql/delayed_mysql_test.go @@ -25,15 +25,13 @@ func TestDelayedMySQL(t *testing.T) { return delay.For(time.Second), nil }, }, - Logger: logger, - InitializeSchema: true, + Logger: logger, }) require.NoError(t, err) sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + Logger: logger, }) require.NoError(t, err) @@ -73,8 +71,7 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { DelayPublisherConfig: delay.PublisherConfig{ AllowNoDelay: true, }, - Logger: logger, - InitializeSchema: true, + Logger: logger, }) require.NoError(t, err) @@ -82,9 +79,8 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + Logger: logger, }) require.NoError(t, err) @@ -109,10 +105,9 @@ func TestDelayedMySQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedMySQLSubscriber(db, sql.DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - AllowNoDelay: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + AllowNoDelay: true, + Logger: logger, }) require.NoError(t, err) diff --git a/pkg/sql/delayed_postgresql.go b/pkg/sql/delayed_postgresql.go index 3107930..fd657af 100644 --- a/pkg/sql/delayed_postgresql.go +++ b/pkg/sql/delayed_postgresql.go @@ -18,8 +18,8 @@ type DelayedPostgreSQLPublisherConfig struct { Logger watermill.LoggerAdapter - // InitializeSchema option enables initializing schema on making subscription. - InitializeSchema bool + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedPostgreSQLPublisherConfig) setDefaults() { @@ -35,7 +35,7 @@ func NewDelayedPostgreSQLPublisher(db ContextExecutor, config DelayedPostgreSQLP publisherConfig := PublisherConfig{ SchemaAdapter: PostgreSQLQueueSchema{}, - AutoInitializeSchema: config.InitializeSchema, + AutoInitializeSchema: !config.DisableInitializeSchema, } if config.OverridePublisherConfig != nil { @@ -75,8 +75,8 @@ type DelayedPostgreSQLSubscriberConfig struct { Logger watermill.LoggerAdapter - // InitializeSchema option enables initializing schema on making subscription. - InitializeSchema bool + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() { @@ -109,7 +109,7 @@ func NewDelayedPostgreSQLSubscriber(db Beginner, config DelayedPostgreSQLSubscri OffsetsAdapter: PostgreSQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: config.InitializeSchema, + InitializeSchema: !config.DisableInitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_postgresql_test.go b/pkg/sql/delayed_postgresql_test.go index 448d33c..0bd46b5 100644 --- a/pkg/sql/delayed_postgresql_test.go +++ b/pkg/sql/delayed_postgresql_test.go @@ -25,15 +25,13 @@ func TestDelayedPostgreSQL(t *testing.T) { return delay.For(time.Second), nil }, }, - Logger: logger, - InitializeSchema: true, + Logger: logger, }) require.NoError(t, err) sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + Logger: logger, }) require.NoError(t, err) @@ -73,8 +71,7 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { DelayPublisherConfig: delay.PublisherConfig{ AllowNoDelay: true, }, - Logger: logger, - InitializeSchema: true, + Logger: logger, }) require.NoError(t, err) @@ -82,9 +79,8 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + Logger: logger, }) require.NoError(t, err) @@ -109,10 +105,9 @@ func TestDelayedPostgreSQL_NoDelay(t *testing.T) { t.Parallel() sub, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - AllowNoDelay: true, - Logger: logger, - InitializeSchema: true, + DeleteOnAck: true, + AllowNoDelay: true, + Logger: logger, }) require.NoError(t, err) diff --git a/pkg/sql/delayed_requeuer.go b/pkg/sql/delayed_requeuer.go index 273eca3..34d1f83 100644 --- a/pkg/sql/delayed_requeuer.go +++ b/pkg/sql/delayed_requeuer.go @@ -49,8 +49,8 @@ type DelayedRequeuerConfig struct { Logger watermill.LoggerAdapter - // InitializeSchema option enables initializing schema on making subscription. - InitializeSchema bool + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedRequeuerConfig) setDefaults() { @@ -102,17 +102,17 @@ func NewPostgreSQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeue } publisher, err := NewDelayedPostgreSQLPublisher(config.DB, DelayedPostgreSQLPublisherConfig{ - Logger: config.Logger, - InitializeSchema: config.InitializeSchema, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedPostgreSQLSubscriber(config.DB, DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, - InitializeSchema: config.InitializeSchema, + DeleteOnAck: true, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err @@ -151,17 +151,17 @@ func NewMySQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeuer, er } publisher, err := NewDelayedMySQLPublisher(config.DB, DelayedMySQLPublisherConfig{ - Logger: config.Logger, - InitializeSchema: config.InitializeSchema, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedMySQLSubscriber(config.DB, DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, - InitializeSchema: config.InitializeSchema, + DeleteOnAck: true, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err diff --git a/pkg/sql/delayed_requeuer_test.go b/pkg/sql/delayed_requeuer_test.go index 208380c..3f8b6c4 100644 --- a/pkg/sql/delayed_requeuer_test.go +++ b/pkg/sql/delayed_requeuer_test.go @@ -28,11 +28,10 @@ func TestPostgreSQLDelayedRequeuer(t *testing.T) { require.NoError(t, err) delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{ - DB: db, - RequeueTopic: watermill.NewUUID(), - Publisher: publisher, - Logger: logger, - InitializeSchema: true, + DB: db, + RequeueTopic: watermill.NewUUID(), + Publisher: publisher, + Logger: logger, }) require.NoError(t, err) @@ -97,11 +96,10 @@ func TestMySQLDelayedRequeuer(t *testing.T) { require.NoError(t, err) delayedRequeuer, err := sql.NewMySQLDelayedRequeuer(sql.DelayedRequeuerConfig{ - DB: db, - RequeueTopic: watermill.NewUUID(), - Publisher: publisher, - Logger: logger, - InitializeSchema: true, + DB: db, + RequeueTopic: watermill.NewUUID(), + Publisher: publisher, + Logger: logger, }) require.NoError(t, err)