diff --git a/pkg/sql/delayed_mysql.go b/pkg/sql/delayed_mysql.go index efa31b0..4174b71 100644 --- a/pkg/sql/delayed_mysql.go +++ b/pkg/sql/delayed_mysql.go @@ -19,6 +19,9 @@ type DelayedMySQLPublisherConfig struct { OverridePublisherConfig func(config *PublisherConfig) error Logger watermill.LoggerAdapter + + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedMySQLPublisherConfig) setDefaults() { @@ -36,7 +39,7 @@ func NewDelayedMySQLPublisher(db ContextExecutor, config DelayedMySQLPublisherCo SchemaAdapter: delayedMySQLSchemaAdapter{ MySQLQueueSchema: MySQLQueueSchema{}, }, - AutoInitializeSchema: true, + AutoInitializeSchema: !config.DisableInitializeSchema, } if config.OverridePublisherConfig != nil { @@ -75,6 +78,9 @@ type DelayedMySQLSubscriberConfig struct { AllowNoDelay bool Logger watermill.LoggerAdapter + + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedMySQLSubscriberConfig) setDefaults() { @@ -107,7 +113,7 @@ func NewDelayedMySQLSubscriber(db Beginner, config DelayedMySQLSubscriberConfig) OffsetsAdapter: MySQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: true, + InitializeSchema: !config.DisableInitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_postgresql.go b/pkg/sql/delayed_postgresql.go index 9428969..fd657af 100644 --- a/pkg/sql/delayed_postgresql.go +++ b/pkg/sql/delayed_postgresql.go @@ -17,6 +17,9 @@ type DelayedPostgreSQLPublisherConfig struct { OverridePublisherConfig func(config *PublisherConfig) error Logger watermill.LoggerAdapter + + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedPostgreSQLPublisherConfig) setDefaults() { @@ -32,7 +35,7 @@ func NewDelayedPostgreSQLPublisher(db ContextExecutor, config DelayedPostgreSQLP publisherConfig := PublisherConfig{ SchemaAdapter: PostgreSQLQueueSchema{}, - AutoInitializeSchema: true, + AutoInitializeSchema: !config.DisableInitializeSchema, } if config.OverridePublisherConfig != nil { @@ -71,6 +74,9 @@ type DelayedPostgreSQLSubscriberConfig struct { AllowNoDelay bool Logger watermill.LoggerAdapter + + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() { @@ -103,7 +109,7 @@ func NewDelayedPostgreSQLSubscriber(db Beginner, config DelayedPostgreSQLSubscri OffsetsAdapter: PostgreSQLQueueOffsetsAdapter{ DeleteOnAck: config.DeleteOnAck, }, - InitializeSchema: true, + InitializeSchema: !config.DisableInitializeSchema, } if config.OverrideSubscriberConfig != nil { diff --git a/pkg/sql/delayed_requeuer.go b/pkg/sql/delayed_requeuer.go index 86c6ff9..34d1f83 100644 --- a/pkg/sql/delayed_requeuer.go +++ b/pkg/sql/delayed_requeuer.go @@ -48,6 +48,9 @@ type DelayedRequeuerConfig struct { DelayOnError *middleware.DelayOnError Logger watermill.LoggerAdapter + + // DisableInitializeSchema option disables auto initializing schema + DisableInitializeSchema bool } func (c *DelayedRequeuerConfig) setDefaults() { @@ -99,15 +102,17 @@ func NewPostgreSQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeue } publisher, err := NewDelayedPostgreSQLPublisher(config.DB, DelayedPostgreSQLPublisherConfig{ - Logger: config.Logger, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedPostgreSQLSubscriber(config.DB, DelayedPostgreSQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, + DeleteOnAck: true, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err @@ -146,15 +151,17 @@ func NewMySQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeuer, er } publisher, err := NewDelayedMySQLPublisher(config.DB, DelayedMySQLPublisherConfig{ - Logger: config.Logger, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err } subscriber, err := NewDelayedMySQLSubscriber(config.DB, DelayedMySQLSubscriberConfig{ - DeleteOnAck: true, - Logger: config.Logger, + DeleteOnAck: true, + Logger: config.Logger, + DisableInitializeSchema: config.DisableInitializeSchema, }) if err != nil { return nil, err