diff --git a/Makefile b/Makefile index 944db81..1903505 100644 --- a/Makefile +++ b/Makefile @@ -2,16 +2,16 @@ up: docker compose up -d test: - go test -timeout=30m ./... + go test -timeout=10m ./... test_v: go test -v ./... test_short: - go test ./... -short + go test -short ./... test_race: - go test ./... -short -race + go test -short -race ./... test_stress: diff --git a/docker-compose.yml b/docker-compose.yml index 5dd22e1..084ef8f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: mysql: image: mysql:8.0 restart: unless-stopped - command: [ "--max_connections=50000" ] + command: [ "--max_connections=5000" ] ports: - 3306:3306 environment: @@ -15,7 +15,7 @@ services: postgres: image: postgres:15.3 restart: unless-stopped - command: postgres -c 'max_connections=50000' + command: postgres -c 'max_connections=5000' ports: - 5432:5432 environment: diff --git a/go.mod b/go.mod index b5bf2e3..70e3259 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.24.1 require ( github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 github.com/go-sql-driver/mysql v1.4.1 - github.com/jackc/pgx/v5 v5.7.2 + github.com/jackc/pgx/v5 v5.7.5 github.com/lib/pq v1.10.9 github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.9.1 @@ -27,9 +27,9 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/sony/gobreaker v1.0.0 // indirect - golang.org/x/crypto v0.35.0 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/text v0.22.0 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.13.0 // indirect + golang.org/x/text v0.24.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8d98a92..89b1408 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= -github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= +github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -48,16 +48,16 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index 472c2fc..68e7e77 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -753,7 +753,7 @@ func TestDefaultPostgreSQLSchema_planner_mis_estimate_regression(t *testing.T) { require.NoError(t, err) messagesCount := 100_000 - if testing.Short() { + if testing.Short() || os.Getenv("CI") == "true" { messagesCount = 1_000 } tests.AddSimpleMessagesParallel(t, messagesCount, pub, topicName, 50) diff --git a/pkg/sql/queue_schema_adapter_postgresql_test.go b/pkg/sql/queue_schema_adapter_postgresql_test.go index a0e72a0..20f151b 100644 --- a/pkg/sql/queue_schema_adapter_postgresql_test.go +++ b/pkg/sql/queue_schema_adapter_postgresql_test.go @@ -61,7 +61,7 @@ func TestPostgreSQLQueueSchemaAdapter(t *testing.T) { case msg := <-messages: receivedMessages = append(receivedMessages, msg) msg.Ack() - case <-time.After(100 * time.Millisecond): + case <-time.After(5 * time.Second): t.Errorf("expected to receive message") break } diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index 643354b..4088cd0 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -180,7 +180,7 @@ func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, e ` + nextOffsetQuery.Query + ` ) - SELECT "offset", transaction_id::text, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + ` + SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + ` WHERE ( @@ -269,23 +269,59 @@ func (x *XID8) Scan(src interface{}) error { return errors.New("cannot scan nil value into XID8") } + // We want to support scanning from various types (different drivers, like lib/pq, pgx, etc.) switch v := src.(type) { + case int64: + if v < 0 { + return fmt.Errorf("cannot convert negative int64 %d to XID8", v) + } + *x = XID8(uint64(v)) + return nil + + case uint64: + *x = XID8(v) + return nil + + case int32: + if v < 0 { + return fmt.Errorf("cannot convert negative int32 %d to XID8", v) + } + *x = XID8(uint64(v)) + return nil + + case uint32: + *x = XID8(v) + return nil + + // pgx case string: + if v == "" { + *x = 0 + return nil + } val, err := strconv.ParseUint(v, 10, 64) if err != nil { - return err + return fmt.Errorf("cannot parse string %q as uint64: %w", v, err) } *x = XID8(val) return nil + + // lib/pq case []byte: + if len(v) == 0 { + *x = 0 + return nil + } + val, err := strconv.ParseUint(string(v), 10, 64) if err != nil { - return err + return fmt.Errorf("cannot parse bytes %q as uint64: %w", string(v), err) } *x = XID8(val) return nil + default: - return errors.New("unsupported Scan value type for XID8") + return fmt.Errorf("cannot scan %T into XID8", src) } } diff --git a/pkg/sql/xid8_test.go b/pkg/sql/xid8_test.go new file mode 100644 index 0000000..3551dc0 --- /dev/null +++ b/pkg/sql/xid8_test.go @@ -0,0 +1,242 @@ +package sql + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestXID8_Scan(t *testing.T) { + tests := []struct { + name string + input interface{} + expected XID8 + wantErr bool + errMsg string + }{ + // Nil input + { + name: "nil input", + input: nil, + wantErr: true, + errMsg: "cannot scan nil value into XID8", + }, + + // Integer types (pgx style) + { + name: "int64 positive", + input: int64(12345), + expected: XID8(12345), + }, + { + name: "int64 zero", + input: int64(0), + expected: XID8(0), + }, + { + name: "int64 max safe value", + input: int64(math.MaxInt64), + expected: XID8(math.MaxInt64), + }, + { + name: "int64 negative", + input: int64(-1), + wantErr: true, + errMsg: "cannot convert negative int64", + }, + { + name: "int64 large negative", + input: int64(-9223372036854775808), // math.MinInt64 + wantErr: true, + errMsg: "cannot convert negative int64", + }, + + // uint64 types + { + name: "uint64 positive", + input: uint64(12345), + expected: XID8(12345), + }, + { + name: "uint64 zero", + input: uint64(0), + expected: XID8(0), + }, + { + name: "uint64 max value", + input: uint64(math.MaxUint64), + expected: XID8(math.MaxUint64), + }, + + // int32 types + { + name: "int32 positive", + input: int32(12345), + expected: XID8(12345), + }, + { + name: "int32 zero", + input: int32(0), + expected: XID8(0), + }, + { + name: "int32 max value", + input: int32(math.MaxInt32), + expected: XID8(math.MaxInt32), + }, + { + name: "int32 negative", + input: int32(-1), + wantErr: true, + errMsg: "cannot convert negative int32", + }, + + // uint32 types + { + name: "uint32 positive", + input: uint32(12345), + expected: XID8(12345), + }, + { + name: "uint32 zero", + input: uint32(0), + expected: XID8(0), + }, + { + name: "uint32 max value", + input: uint32(math.MaxUint32), + expected: XID8(math.MaxUint32), + }, + // String types + { + name: "string positive number", + input: "12345", + expected: XID8(12345), + }, + { + name: "string zero", + input: "0", + expected: XID8(0), + }, + { + name: "string large number", + input: "18446744073709551615", // math.MaxUint64 + expected: XID8(math.MaxUint64), + }, + { + name: "string with leading zeros", + input: "00012345", + expected: XID8(12345), + }, + { + name: "empty string", + input: "", + expected: XID8(0), + }, + { + name: "string with negative number", + input: "-12345", + wantErr: true, + errMsg: "cannot parse string", + }, + { + name: "string too large for uint64", + input: "18446744073709551616", // MaxUint64 + 1 + wantErr: true, + errMsg: "cannot parse string", + }, + // Byte slice types + { + name: "bytes number", + input: []byte{51, 55, 57, 51, 50}, + expected: XID8(37932), // "37932" in bytes + }, + { + name: "bytes as string number", + input: []byte("12345"), + expected: XID8(12345), + }, + { + name: "bytes zero", + input: []byte("0"), + expected: XID8(0), + }, + { + name: "bytes large number", + input: []byte("18446744073709551615"), + expected: XID8(math.MaxUint64), + }, + { + name: "empty bytes", + input: []byte{}, + expected: XID8(0), + }, + { + name: "empty bytes nil", + input: []byte(nil), + expected: XID8(0), + }, + { + name: "bytes invalid string", + input: []byte("abc123"), + wantErr: true, + errMsg: "cannot parse bytes", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var x XID8 + err := x.Scan(tt.input) + + if tt.wantErr { + require.Error(t, err) + if tt.errMsg != "" { + assert.Contains(t, err.Error(), tt.errMsg) + } + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, x) + } + }) + } +} + +func TestXID8_Value(t *testing.T) { + tests := []struct { + name string + input XID8 + expected uint64 + }{ + { + name: "zero value", + input: XID8(0), + expected: 0, + }, + { + name: "small positive value", + input: XID8(12345), + expected: 12345, + }, + { + name: "max uint64 value", + input: XID8(math.MaxUint64), + expected: math.MaxUint64, + }, + { + name: "real postgres xid8 value", + input: XID8(732406), + expected: 732406, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + value, err := tt.input.Value() + require.NoError(t, err) + assert.Equal(t, tt.expected, value) + }) + } +}