diff --git a/admin/api/api.go b/admin/api/api.go index 49b2c80b..a1dbc721 100644 --- a/admin/api/api.go +++ b/admin/api/api.go @@ -2,16 +2,17 @@ package api import ( "encoding/json" + "fmt" "net/http" "net/http/pprof" - "strconv" + "github.com/getkin/kin-openapi/openapi3" + "github.com/go-playground/form" "github.com/gorilla/mux" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" dberrs "github.com/webhookx-io/webhookx/db/errs" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/pkg/declarative" "github.com/webhookx-io/webhookx/pkg/errs" @@ -58,27 +59,26 @@ func (api *API) param(r *http.Request, variable string) string { return mux.Vars(r)[variable] } -// query returns the url query value if it exists. -func (api *API) query(r *http.Request, name string) string { - return r.URL.Query().Get(name) -} - func (api *API) json(code int, w http.ResponseWriter, data interface{}) { response.JSON(w, code, data) } -func (api *API) bindQuery(r *http.Request, q *query.Query) { - page, _ := strconv.Atoi(r.URL.Query().Get("page_no")) - if page <= 0 { - page = 1 - } - pagesize, _ := strconv.Atoi(r.URL.Query().Get("page_size")) - if pagesize <= 0 { - pagesize = 20 +func (api *API) lookupOperation(path string, method string) *openapi3.Operation { + operation := openapi.Spec.Paths.Find(path).GetOperation(method) + if operation == nil { + panic(fmt.Errorf("operation %s.%s not found", path, method)) } + return operation +} - q.Page(uint64(page), uint64(pagesize)) +func (api *API) bindQuery(r *http.Request, value interface{}) error { + decoder := form.NewDecoder() + err := decoder.Decode(value, r.URL.Query()) + if err != nil { + return errs.NewValidateError(err) + } + return nil } func (api *API) error(code int, w http.ResponseWriter, err error) { diff --git a/admin/api/attempts.go b/admin/api/attempts.go index ba7f04c5..bf9ad108 100644 --- a/admin/api/attempts.go +++ b/admin/api/attempts.go @@ -3,26 +3,28 @@ package api import ( "net/http" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" - - "github.com/webhookx-io/webhookx/db/query" - "github.com/webhookx-io/webhookx/utils" ) func (api *API) PageAttempt(w http.ResponseWriter, r *http.Request) { - var q query.AttemptQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) - if r.URL.Query().Get("event_id") != "" { - q.EventId = utils.Pointer(r.URL.Query().Get("event_id")) + parameters := api.lookupOperation("/workspaces/{ws_id}/attempts", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return } - if r.URL.Query().Get("endpoint_id") != "" { - q.EndpointId = utils.Pointer(r.URL.Query().Get("endpoint_id")) + + var params AttemptListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return } - list, total, err := api.db.AttemptsWS.Page(r.Context(), &q) + + query := params.Query() + page, err := api.db.AttemptsWS.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetAttempt(w http.ResponseWriter, r *http.Request) { diff --git a/admin/api/endpoints.go b/admin/api/endpoints.go index 356c4d8b..09385ea9 100644 --- a/admin/api/endpoints.go +++ b/admin/api/endpoints.go @@ -4,20 +4,30 @@ import ( "net/http" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/contextx" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" ) func (api *API) PageEndpoint(w http.ResponseWriter, r *http.Request) { - var q query.EndpointQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) - list, total, err := api.db.EndpointsWS.Page(r.Context(), &q) + parameters := api.lookupOperation("/workspaces/{ws_id}/endpoints", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return + } + + var params EndpointListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return + } + + query := params.Query() + page, err := api.db.EndpointsWS.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetEndpoint(w http.ResponseWriter, r *http.Request) { diff --git a/admin/api/events.go b/admin/api/events.go index afdaa494..03c1547f 100644 --- a/admin/api/events.go +++ b/admin/api/events.go @@ -6,22 +6,31 @@ import ( "time" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/contextx" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/services/eventbus" "github.com/webhookx-io/webhookx/utils" ) func (api *API) PageEvent(w http.ResponseWriter, r *http.Request) { - var q query.EventQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) + parameters := api.lookupOperation("/workspaces/{ws_id}/events", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return + } + + var params EventListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return + } - list, total, err := api.db.EventsWS.Page(r.Context(), &q) + query := params.Query() + page, err := api.db.EventsWS.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetEvent(w http.ResponseWriter, r *http.Request) { diff --git a/admin/api/models.go b/admin/api/models.go index fce20eda..b5c3d373 100644 --- a/admin/api/models.go +++ b/admin/api/models.go @@ -1,13 +1,26 @@ package api +import "github.com/webhookx-io/webhookx/db/dao" + type Pagination[T any] struct { Total int64 `json:"total"` Data []T `json:"data"` } -func NewPagination[T any](total int64, data []T) *Pagination[T] { - return &Pagination[T]{ - Total: total, - Data: data, +type PaginationCursor[T any] struct { + Next *string `json:"next"` + Data []T `json:"data"` +} + +func NewPagination[T any](q *dao.Query, cursor dao.CursorResult[T]) interface{} { + if q.CursorModel { + return PaginationCursor[T]{ + Next: cursor.Cursor, + Data: cursor.Data, + } + } + return Pagination[T]{ + Total: cursor.Total, + Data: cursor.Data, } } diff --git a/admin/api/params.go b/admin/api/params.go new file mode 100644 index 00000000..70f3c9bd --- /dev/null +++ b/admin/api/params.go @@ -0,0 +1,374 @@ +package api + +import ( + "encoding/json" + "strings" + "time" + + "github.com/webhookx-io/webhookx/db/dao" +) + +const ( + DefaultPageSize = 20 +) + +type ListParams struct { + // Deprecated + PageNo int `form:"page_no"` + // Deprecated + PageSize int `form:"page_size"` + + // limit parameter + Limit *int `form:"limit"` + // after parameter + After *string `form:"after"` + // before parameter + Before *string `form:"before"` + + // sort parameter + Sort string `form:"sort"` +} + +func (p *ListParams) Query() *dao.Query { + var query dao.Query + + pageNo := max(p.PageNo, 1) + pageSize := p.PageSize + if pageSize <= 0 { + pageSize = DefaultPageSize + } + query.Page(pageNo, pageSize) + + sort := p.Sort + if sort == "" { + sort = "id.desc" + } + field, order, _ := strings.Cut(sort, ".") + query.Order(field, order) + + if p.Limit != nil { + query.Page(1, *p.Limit) + } + if p.After != nil { + switch order { + case "asc": + query.Where("id", dao.GreaterThan, *p.After) + case "desc": + query.Where("id", dao.LessThan, *p.After) + } + } + if p.Before != nil { + switch order { + case "asc": + query.Where("id", dao.LessThan, *p.Before) + case "desc": + query.Where("id", dao.GreaterThan, *p.Before) + } + } + if p.Limit != nil || p.After != nil || p.Before != nil { + query.CursorModel = true + } + return &query +} + +type EndpointListParams struct { + ListParams + + Name *string `form:"name"` + Enabled *bool `form:"enabled"` + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + Metadata map[string]string `form:"metadata"` +} + +func (p *EndpointListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.Name != nil { + query.Where("name", dao.Equal, *p.Name) + } + if p.Enabled != nil { + query.Where("enabled", dao.Equal, *p.Enabled) + } + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if len(p.Metadata) > 0 { + b, _ := json.Marshal(p.Metadata) + query.Where("metadata", dao.JsonContain, string(b)) + } + return query +} + +type SourceListParams struct { + ListParams + + Name *string `form:"name"` + Enabled *bool `form:"enabled"` + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + Metadata map[string]string `form:"metadata"` +} + +func (p *SourceListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.Name != nil { + query.Where("name", dao.Equal, *p.Name) + } + if p.Enabled != nil { + query.Where("enabled", dao.Equal, *p.Enabled) + } + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if len(p.Metadata) > 0 { + b, _ := json.Marshal(p.Metadata) + query.Where("metadata", dao.JsonContain, string(b)) + } + return query +} + +type PluginListParams struct { + ListParams + + Name *string `form:"name"` + Enabled *bool `form:"enabled"` + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + Metadata map[string]string `form:"metadata"` + EndpointId *string `form:"endpoint_id"` + SourceId *string `form:"source_id"` +} + +func (p *PluginListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.Name != nil { + query.Where("name", dao.Equal, *p.Name) + } + if p.Enabled != nil { + query.Where("enabled", dao.Equal, *p.Enabled) + } + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if len(p.Metadata) > 0 { + b, _ := json.Marshal(p.Metadata) + query.Where("metadata", dao.JsonContain, string(b)) + } + if p.EndpointId != nil { + query.Where("endpoint_id", dao.Equal, *p.EndpointId) + } + if p.SourceId != nil { + query.Where("source_id", dao.Equal, *p.SourceId) + } + return query +} + +type AttemptListParams struct { + ListParams + + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + EventId *string `form:"event_id"` + EndpointId *string `form:"endpoint_id"` + Status *string `form:"status"` + AttemptedAt *int64 `form:"attempted_at"` + AttemptedAtGT *int64 `form:"attempted_at[gt]"` + AttemptedAtGTE *int64 `form:"attempted_at[gte]"` + AttemptedAtLT *int64 `form:"attempted_at[lt]"` + AttemptedAtLTE *int64 `form:"attempted_at[lte]"` +} + +func (p *AttemptListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if p.EventId != nil { + query.Where("event_id", dao.Equal, *p.EventId) + } + if p.EndpointId != nil { + query.Where("endpoint_id", dao.Equal, *p.EndpointId) + } + if p.Status != nil { + query.Where("status", dao.Equal, *p.Status) + } + if p.AttemptedAt != nil { + query.Where("attempted_at", dao.Equal, time.UnixMilli(*p.AttemptedAt)) + } + if p.AttemptedAtGT != nil { + query.Where("attempted_at", dao.GreaterThan, time.UnixMilli(*p.AttemptedAtGT)) + } + if p.AttemptedAtGTE != nil { + query.Where("attempted_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.AttemptedAtGTE)) + } + if p.AttemptedAtLT != nil { + query.Where("attempted_at", dao.LessThan, time.UnixMilli(*p.AttemptedAtLT)) + } + if p.AttemptedAtLTE != nil { + query.Where("attempted_at", dao.LessThanOrEqual, time.UnixMilli(*p.AttemptedAtLTE)) + } + return query +} + +type WorkspaceListParams struct { + ListParams + + Name *string `form:"name"` + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + Metadata map[string]string `form:"metadata"` +} + +func (p *WorkspaceListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.Name != nil { + query.Where("name", dao.Equal, *p.Name) + } + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if len(p.Metadata) > 0 { + b, _ := json.Marshal(p.Metadata) + query.Where("metadata", dao.JsonContain, string(b)) + } + return query +} + +type EventListParams struct { + ListParams + + CreatedAt *int64 `form:"created_at"` + CreatedAtGT *int64 `form:"created_at[gt]"` + CreatedAtGTE *int64 `form:"created_at[gte]"` + CreatedAtLT *int64 `form:"created_at[lt]"` + CreatedAtLTE *int64 `form:"created_at[lte]"` + EventType *string `form:"event_type"` + UniqueId *string `form:"unique_id"` + IngestedAt *int64 `form:"ingested_at"` + IngestedAtGT *int64 `form:"ingested_at[gt]"` + IngestedAtGTE *int64 `form:"ingested_at[gte]"` + IngestedAtLT *int64 `form:"ingested_at[lt]"` + IngestedAtLTE *int64 `form:"ingested_at[lte]"` +} + +func (p *EventListParams) Query() *dao.Query { + query := p.ListParams.Query() + + if p.CreatedAt != nil { + query.Where("created_at", dao.Equal, time.UnixMilli(*p.CreatedAt)) + } + if p.CreatedAtGT != nil { + query.Where("created_at", dao.GreaterThan, time.UnixMilli(*p.CreatedAtGT)) + } + if p.CreatedAtGTE != nil { + query.Where("created_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.CreatedAtGTE)) + } + if p.CreatedAtLT != nil { + query.Where("created_at", dao.LessThan, time.UnixMilli(*p.CreatedAtLT)) + } + if p.CreatedAtLTE != nil { + query.Where("created_at", dao.LessThanOrEqual, time.UnixMilli(*p.CreatedAtLTE)) + } + if p.EventType != nil { + query.Where("event_type", dao.Equal, *p.EventType) + } + if p.UniqueId != nil { + query.Where("unique_id", dao.Equal, *p.UniqueId) + } + if p.IngestedAt != nil { + query.Where("ingested_at", dao.Equal, time.UnixMilli(*p.IngestedAt)) + } + if p.IngestedAtGT != nil { + query.Where("ingested_at", dao.GreaterThan, time.UnixMilli(*p.IngestedAtGT)) + } + if p.IngestedAtGTE != nil { + query.Where("ingested_at", dao.GreaterThanOrEqual, time.UnixMilli(*p.IngestedAtGTE)) + } + if p.IngestedAtLT != nil { + query.Where("ingested_at", dao.LessThan, time.UnixMilli(*p.IngestedAtLT)) + } + if p.IngestedAtLTE != nil { + query.Where("ingested_at", dao.LessThanOrEqual, time.UnixMilli(*p.IngestedAtLTE)) + } + return query +} diff --git a/admin/api/params_test.go b/admin/api/params_test.go new file mode 100644 index 00000000..83dcc52e --- /dev/null +++ b/admin/api/params_test.go @@ -0,0 +1,210 @@ +package api + +import ( + "encoding/json" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/db/dao" + "github.com/webhookx-io/webhookx/utils" +) + +var _ = Describe("params to query", Ordered, func() { + Context("EndpointListParams", func() { + It("filters", func() { + metadata := map[string]string{"foo": "bar"} + metadataJson, _ := json.Marshal(metadata) + params := EndpointListParams{ + Name: utils.Pointer("test-endpoint"), + Enabled: utils.Pointer(true), + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(2000)), + CreatedAtGTE: utils.Pointer(int64(3000)), + CreatedAtLT: utils.Pointer(int64(4000)), + CreatedAtLTE: utils.Pointer(int64(5000)), + Metadata: metadata, + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"name", dao.Equal, "test-endpoint"}, + {"enabled", dao.Equal, true}, + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(2000)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(3000)}, + {"created_at", dao.LessThan, time.UnixMilli(4000)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(5000)}, + {"metadata", dao.JsonContain, string(metadataJson)}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) + + Context("SourceListParams", func() { + It("filters", func() { + metadata := map[string]string{"foo": "bar"} + metadataJson, _ := json.Marshal(metadata) + params := SourceListParams{ + Name: utils.Pointer("test-source"), + Enabled: utils.Pointer(false), + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(2000)), + CreatedAtGTE: utils.Pointer(int64(3000)), + CreatedAtLT: utils.Pointer(int64(4000)), + CreatedAtLTE: utils.Pointer(int64(5000)), + Metadata: metadata, + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"name", dao.Equal, "test-source"}, + {"enabled", dao.Equal, false}, + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(2000)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(3000)}, + {"created_at", dao.LessThan, time.UnixMilli(4000)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(5000)}, + {"metadata", dao.JsonContain, string(metadataJson)}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) + + Context("PluginListParams", func() { + It("filters", func() { + metadata := map[string]string{"foo": "bar"} + metadataJson, _ := json.Marshal(metadata) + params := PluginListParams{ + Name: utils.Pointer("test-plugin"), + Enabled: utils.Pointer(true), + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(2000)), + CreatedAtGTE: utils.Pointer(int64(3000)), + CreatedAtLT: utils.Pointer(int64(4000)), + CreatedAtLTE: utils.Pointer(int64(5000)), + Metadata: metadata, + EndpointId: utils.Pointer("ep_123"), + SourceId: utils.Pointer("src_123"), + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"name", dao.Equal, "test-plugin"}, + {"enabled", dao.Equal, true}, + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(2000)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(3000)}, + {"created_at", dao.LessThan, time.UnixMilli(4000)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(5000)}, + {"metadata", dao.JsonContain, string(metadataJson)}, + {"endpoint_id", dao.Equal, "ep_123"}, + {"source_id", dao.Equal, "src_123"}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) + + Context("AttemptListParams", func() { + It("filters", func() { + params := AttemptListParams{ + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(1100)), + CreatedAtGTE: utils.Pointer(int64(1200)), + CreatedAtLT: utils.Pointer(int64(1300)), + CreatedAtLTE: utils.Pointer(int64(1400)), + EventId: utils.Pointer("evt_123"), + EndpointId: utils.Pointer("ep_123"), + Status: utils.Pointer("SUCCESS"), + AttemptedAt: utils.Pointer(int64(2000)), + AttemptedAtGT: utils.Pointer(int64(2100)), + AttemptedAtGTE: utils.Pointer(int64(2200)), + AttemptedAtLT: utils.Pointer(int64(2300)), + AttemptedAtLTE: utils.Pointer(int64(2400)), + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(1100)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(1200)}, + {"created_at", dao.LessThan, time.UnixMilli(1300)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(1400)}, + {"event_id", dao.Equal, "evt_123"}, + {"endpoint_id", dao.Equal, "ep_123"}, + {"status", dao.Equal, "SUCCESS"}, + {"attempted_at", dao.Equal, time.UnixMilli(2000)}, + {"attempted_at", dao.GreaterThan, time.UnixMilli(2100)}, + {"attempted_at", dao.GreaterThanOrEqual, time.UnixMilli(2200)}, + {"attempted_at", dao.LessThan, time.UnixMilli(2300)}, + {"attempted_at", dao.LessThanOrEqual, time.UnixMilli(2400)}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) + + Context("WorkspaceListParams", func() { + It("filters", func() { + metadata := map[string]string{"foo": "bar"} + metadataJson, _ := json.Marshal(metadata) + params := WorkspaceListParams{ + Name: utils.Pointer("test-workspace"), + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(2000)), + CreatedAtGTE: utils.Pointer(int64(3000)), + CreatedAtLT: utils.Pointer(int64(4000)), + CreatedAtLTE: utils.Pointer(int64(5000)), + Metadata: metadata, + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"name", dao.Equal, "test-workspace"}, + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(2000)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(3000)}, + {"created_at", dao.LessThan, time.UnixMilli(4000)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(5000)}, + {"metadata", dao.JsonContain, string(metadataJson)}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) + + Context("EventListParams", func() { + It("filters", func() { + params := EventListParams{ + CreatedAt: utils.Pointer(int64(1000)), + CreatedAtGT: utils.Pointer(int64(1100)), + CreatedAtGTE: utils.Pointer(int64(1200)), + CreatedAtLT: utils.Pointer(int64(1300)), + CreatedAtLTE: utils.Pointer(int64(1400)), + EventType: utils.Pointer("user.created"), + UniqueId: utils.Pointer("uid_123"), + IngestedAt: utils.Pointer(int64(2000)), + IngestedAtGT: utils.Pointer(int64(2100)), + IngestedAtGTE: utils.Pointer(int64(2200)), + IngestedAtLT: utils.Pointer(int64(2300)), + IngestedAtLTE: utils.Pointer(int64(2400)), + } + query := params.Query() + expectedWheres := []dao.Condition{ + {"created_at", dao.Equal, time.UnixMilli(1000)}, + {"created_at", dao.GreaterThan, time.UnixMilli(1100)}, + {"created_at", dao.GreaterThanOrEqual, time.UnixMilli(1200)}, + {"created_at", dao.LessThan, time.UnixMilli(1300)}, + {"created_at", dao.LessThanOrEqual, time.UnixMilli(1400)}, + {"event_type", dao.Equal, "user.created"}, + {"unique_id", dao.Equal, "uid_123"}, + {"ingested_at", dao.Equal, time.UnixMilli(2000)}, + {"ingested_at", dao.GreaterThan, time.UnixMilli(2100)}, + {"ingested_at", dao.GreaterThanOrEqual, time.UnixMilli(2200)}, + {"ingested_at", dao.LessThan, time.UnixMilli(2300)}, + {"ingested_at", dao.LessThanOrEqual, time.UnixMilli(2400)}, + } + assert.EqualValues(GinkgoT(), expectedWheres, query.Wheres) + }) + }) +}) + +func Test(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Params") +} diff --git a/admin/api/plugins.go b/admin/api/plugins.go index aeaef30b..f3d6eac3 100644 --- a/admin/api/plugins.go +++ b/admin/api/plugins.go @@ -5,20 +5,30 @@ import ( "net/http" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/errs" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" ) func (api *API) PagePlugin(w http.ResponseWriter, r *http.Request) { - var q query.PluginQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) - list, total, err := api.db.PluginsWS.Page(r.Context(), &q) + parameters := api.lookupOperation("/workspaces/{ws_id}/plugins", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return + } + + var params PluginListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return + } + + query := params.Query() + page, err := api.db.PluginsWS.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetPlugin(w http.ResponseWriter, r *http.Request) { diff --git a/admin/api/sources.go b/admin/api/sources.go index b98a2584..e813af64 100644 --- a/admin/api/sources.go +++ b/admin/api/sources.go @@ -4,20 +4,30 @@ import ( "net/http" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/contextx" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" ) func (api *API) PageSource(w http.ResponseWriter, r *http.Request) { - var q query.SourceQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) - list, total, err := api.db.SourcesWS.Page(r.Context(), &q) + parameters := api.lookupOperation("/workspaces/{ws_id}/sources", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return + } + + var params SourceListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return + } + + query := params.Query() + page, err := api.db.SourcesWS.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetSource(w http.ResponseWriter, r *http.Request) { diff --git a/admin/api/workspaces.go b/admin/api/workspaces.go index f719b775..986093d9 100644 --- a/admin/api/workspaces.go +++ b/admin/api/workspaces.go @@ -5,19 +5,29 @@ import ( "net/http" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" ) func (api *API) PageWorkspace(w http.ResponseWriter, r *http.Request) { - var q query.WorkspaceQuery - q.Order("id", query.DESC) - api.bindQuery(r, &q.Query) - list, total, err := api.db.Workspaces.Page(r.Context(), &q) + parameters := api.lookupOperation("/workspaces", http.MethodGet).Parameters + if err := openapi.ValidateParameters(r, parameters); err != nil { + api.error(400, w, err) + return + } + + var params WorkspaceListParams + if err := api.bindQuery(r, ¶ms); err != nil { + api.error(400, w, err) + return + } + + query := params.Query() + page, err := api.db.Workspaces.Cursor(r.Context(), query) api.assert(err) - api.json(200, w, NewPagination(total, list)) + api.json(200, w, NewPagination(query, page)) } func (api *API) GetWorkspace(w http.ResponseWriter, r *http.Request) { diff --git a/app/app.go b/app/app.go index 0169ae96..790ec345 100644 --- a/app/app.go +++ b/app/app.go @@ -19,9 +19,8 @@ import ( "github.com/webhookx-io/webhookx/config/modules" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" - "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/migrator" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/mcache" "github.com/webhookx-io/webhookx/pkg/accesslog" @@ -29,6 +28,7 @@ import ( "github.com/webhookx-io/webhookx/pkg/license" "github.com/webhookx-io/webhookx/pkg/log" "github.com/webhookx-io/webhookx/pkg/metrics" + "github.com/webhookx-io/webhookx/pkg/openapi" "github.com/webhookx-io/webhookx/pkg/ratelimiter" "github.com/webhookx-io/webhookx/pkg/reports" "github.com/webhookx-io/webhookx/pkg/secret" @@ -55,7 +55,7 @@ import ( func init() { plugins.LoadPlugins() - entities.LoadOpenAPI(webhookx.OpenAPI) + openapi.LoadOpenAPI(webhookx.OpenAPI) } type Application struct { @@ -369,7 +369,7 @@ func (app *Application) getService(name string) services.Service { func (app *Application) buildPluginIterator(version string) (*plugins.Iterator, error) { app.log.Debugw("building plugin iterator", "version", version) - list, err := app.db.Plugins.List(context.TODO(), &query.PluginQuery{}) + list, err := app.db.Plugins.List(context.TODO(), &dao.Query{}) if err != nil { return nil, fmt.Errorf("failed to query plugins from database: %v", err) } diff --git a/db/dao/attempt_dao.go b/db/dao/attempt_dao.go index 87876fd8..880a6172 100644 --- a/db/dao/attempt_dao.go +++ b/db/dao/attempt_dao.go @@ -85,3 +85,29 @@ func (dao *attemptDao) ListUnqueuedForUpdate(ctx context.Context, maxScheduledAt err = dao.UnsafeDB(ctx).SelectContext(ctx, &list, sql, maxScheduledAt, limit) return } + +type AttemptQuery struct { + Query + + IDs []string + EventId *string + EndpointId *string + Status *string +} + +func (q *AttemptQuery) ToQuery() *Query { + query := q.clone() + if q.IDs != nil { + query.Where("id", Equal, q.IDs) + } + if q.EventId != nil { + query.Where("event_id", Equal, *q.EventId) + } + if q.EndpointId != nil { + query.Where("endpoint_id", Equal, *q.EndpointId) + } + if q.Status != nil { + query.Where("status", Equal, *q.Status) + } + return &query +} diff --git a/db/dao/dao.go b/db/dao/dao.go index afcff2ce..e5855e6d 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -11,7 +11,6 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "github.com/webhookx-io/webhookx/db/errs" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/db/transaction" "github.com/webhookx-io/webhookx/pkg/contextx" "github.com/webhookx-io/webhookx/pkg/tracing" @@ -37,9 +36,10 @@ type DAO[T any] struct { log *zap.SugaredLogger db *sqlx.DB - workspace bool - opts Options - columns []string + workspace bool + opts Options + columns []string + conditionColumns map[string]bool } type Options struct { @@ -78,16 +78,17 @@ func NewDAO[T any](db *sqlx.DB, opts Options, funcs ...OptionFunc) *DAO[T] { } dao := DAO[T]{ - log: zap.S().Named("dao"), - db: db, - workspace: opts.Workspace, - opts: opts, + log: zap.S().Named("dao"), + db: db, + workspace: opts.Workspace, + opts: opts, + conditionColumns: make(map[string]bool), } EachField(new(T), func(f reflect.StructField, _ reflect.Value, column string) { - if column == "created_at" || column == "updated_at" { - return + dao.conditionColumns[column] = true + if !(column == "created_at" || column == "updated_at") { + dao.columns = append(dao.columns, column) } - dao.columns = append(dao.columns, column) }) return &dao } @@ -134,10 +135,7 @@ func (dao *DAO[T]) Get(ctx context.Context, id string) (entity *T, err error) { func (dao *DAO[T]) Select(ctx context.Context, field string, value string) (entity *T, err error) { builder := psql.Select("*").From(dao.opts.Table).Where(sq.Eq{field: value}) - if dao.workspace { - wid := contextx.GetWorkspaceID(ctx) - builder = builder.Where(sq.Eq{"ws_id": wid}) - } + builder = dao.workspaceFilter(ctx, builder) statement, args := builder.MustSql() dao.debugSQL(statement, args) entity = new(T) @@ -150,10 +148,7 @@ func (dao *DAO[T]) Select(ctx context.Context, field string, value string) (enti func (dao *DAO[T]) selectByField(ctx context.Context, field string, value string) (entity *T, err error) { builder := psql.Select("*").From(dao.opts.Table).Where(sq.Eq{field: value}) - if dao.workspace { - wid := contextx.GetWorkspaceID(ctx) - builder = builder.Where(sq.Eq{"ws_id": wid}) - } + builder = dao.workspaceFilter(ctx, builder) statement, args := builder.MustSql() dao.debugSQL(statement, args) entity = new(T) @@ -189,56 +184,80 @@ func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) { return true, nil } -func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total int64, err error) { - ctx, span := dao.trace(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table)) - defer span.End() +func appendWhere(allowedColumns map[string]bool, builder sq.SelectBuilder, conditions []Condition) sq.SelectBuilder { + for _, condition := range conditions { + if allowedColumns[condition.Column] { + switch condition.Op { + case Equal: + builder = builder.Where(sq.Eq{condition.Column: condition.Value}) + case JsonContain: + builder = builder.Where(condition.Column+" @> ?", condition.Value) + case GreaterThan: + builder = builder.Where(sq.Gt{condition.Column: condition.Value}) + case GreaterThanOrEqual: + builder = builder.Where(sq.GtOrEq{condition.Column: condition.Value}) + case LessThan: + builder = builder.Where(sq.Lt{condition.Column: condition.Value}) + case LessThanOrEqual: + builder = builder.Where(sq.LtOrEq{condition.Column: condition.Value}) + } + } + } + return builder +} - total, err = dao.Count(ctx, q.WhereMap()) - if err != nil { - return +func appendOrder(builder sq.SelectBuilder, orders []Order) sq.SelectBuilder { + for _, order := range orders { + builder = builder.OrderBy(order.SQL()) } - list, err = dao.List(ctx, q) - return + return builder +} + +func (dao *DAO[T]) workspaceFilter(ctx context.Context, builder sq.SelectBuilder) sq.SelectBuilder { + if dao.workspace { + wid := contextx.GetWorkspaceID(ctx) + builder = builder.Where("ws_id = ?", wid) + } + return builder } -func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (total int64, err error) { +func (dao *DAO[T]) Count(ctx context.Context, query *Query) (total int64, err error) { + if query == nil { + panic("query is nil") + } + ctx, span := dao.trace(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table)) defer span.End() builder := psql.Select("COUNT(*)").From(dao.opts.Table) - if len(where) > 0 { - builder = builder.Where(where) - } - if dao.workspace { - wid := contextx.GetWorkspaceID(ctx) - builder = builder.Where(sq.Eq{"ws_id": wid}) - } + builder = appendWhere(dao.conditionColumns, builder, query.Wheres) + builder = dao.workspaceFilter(ctx, builder) statement, args := builder.MustSql() dao.debugSQL(statement, args) err = dao.DB(ctx).GetContext(ctx, &total, statement, args...) return } -func (dao *DAO[T]) List(ctx context.Context, q query.Queryer) (list []*T, err error) { +func (dao *DAO[T]) List(ctx context.Context, query *Query) (list []*T, err error) { + if query == nil { + panic("query is nil") + } + ctx, span := dao.trace(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table)) defer span.End() builder := psql.Select("*").From(dao.opts.Table) - where := q.WhereMap() - if len(where) > 0 { - builder = builder.Where(where) - } - if dao.workspace { - wid := contextx.GetWorkspaceID(ctx) - builder = builder.Where(sq.Eq{"ws_id": wid}) - } - if q.Limit() != 0 { - builder = builder.Offset(uint64(q.Offset())) - builder = builder.Limit(uint64(q.Limit())) + builder = appendWhere(dao.conditionColumns, builder, query.Wheres) + builder = dao.workspaceFilter(ctx, builder) + builder = appendOrder(builder, query.Orders) + + if query.Limit != 0 { + builder = builder.Limit(uint64(query.Limit)) } - for _, order := range q.Orders() { - builder = builder.OrderBy(order.Column + " " + order.Sort) + if query.Offset != 0 { + builder = builder.Offset(uint64(query.Offset)) } + statement, args := builder.MustSql() dao.debugSQL(statement, args) list = make([]*T, 0) @@ -246,6 +265,63 @@ func (dao *DAO[T]) List(ctx context.Context, q query.Queryer) (list []*T, err er return } +func (dao *DAO[T]) Cursor(ctx context.Context, query *Query) (res CursorResult[*T], err error) { + if query == nil { + panic("query is nil") + } + if query.Limit <= 0 { + panic("query.limit must be positive") + } + + var spanName string + if query.CursorModel { + spanName = fmt.Sprintf("dao.%s.cursor", dao.opts.Table) + } else { + spanName = fmt.Sprintf("dao.%s.page", dao.opts.Table) + } + ctx, span := dao.trace(ctx, spanName) + defer span.End() + + builder := psql.Select("*").From(dao.opts.Table) + builder = appendWhere(dao.conditionColumns, builder, query.Wheres) + builder = dao.workspaceFilter(ctx, builder) + builder = appendOrder(builder, query.Orders) + builder = builder.Limit(uint64(query.Limit + 1)) + if query.Offset > 0 { + builder = builder.Offset(uint64(query.Offset)) + } + + statement, args := builder.MustSql() + dao.debugSQL(statement, args) + + res.Data = make([]*T, 0) + err = dao.UnsafeDB(ctx).SelectContext(ctx, &res.Data, statement, args...) + if err != nil { + return + } + + if len(res.Data) > query.Limit { + res.HasMore = true + res.Data = res.Data[:query.Limit] + last := res.Data[len(res.Data)-1] + id := reflect.ValueOf(*last).FieldByName("ID") + if id.IsValid() { + var s = id.String() + res.Cursor = &s + } + } + + if !query.CursorModel { + totoal, err := dao.Count(ctx, query) + if err != nil { + return res, err + } + res.Total = totoal + } + + return +} + func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error { ctx, span := dao.trace(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table)) defer span.End() diff --git a/db/dao/daos.go b/db/dao/daos.go index 78eceef6..3d5eb4c4 100644 --- a/db/dao/daos.go +++ b/db/dao/daos.go @@ -5,7 +5,6 @@ import ( "time" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" ) type BaseDAO[T any] interface { @@ -15,9 +14,9 @@ type BaseDAO[T any] interface { Update(ctx context.Context, entity *T) error Upsert(ctx context.Context, fields []string, entity *T) error Delete(ctx context.Context, id string) (bool, error) - Page(ctx context.Context, q query.Queryer) ([]*T, int64, error) - List(ctx context.Context, q query.Queryer) ([]*T, error) - Count(ctx context.Context, conditions map[string]interface{}) (int64, error) + Count(ctx context.Context, query *Query) (int64, error) + List(ctx context.Context, query *Query) ([]*T, error) + Cursor(ctx context.Context, query *Query) (CursorResult[*T], error) BatchInsert(ctx context.Context, entities []*T) error } diff --git a/db/dao/endpoint_dao.go b/db/dao/endpoint_dao.go index ac4488ea..0468ee68 100644 --- a/db/dao/endpoint_dao.go +++ b/db/dao/endpoint_dao.go @@ -21,3 +21,20 @@ func NewEndpointDAO(db *sqlx.DB, fns ...OptionFunc) EndpointDAO { DAO: NewDAO[entities.Endpoint](db, opts, fns...), } } + +type EndpointQuery struct { + Query + Enabled *bool + WorkspaceId *string +} + +func (q *EndpointQuery) ToQuery() *Query { + query := q.clone() + if q.Enabled != nil { + query.Where("enabled", Equal, *q.Enabled) + } + if q.WorkspaceId != nil { + query.Where("ws_id", Equal, *q.WorkspaceId) + } + return &query +} diff --git a/db/dao/event_dao.go b/db/dao/event_dao.go index 040aadbd..a501ff2b 100644 --- a/db/dao/event_dao.go +++ b/db/dao/event_dao.go @@ -65,3 +65,8 @@ func (dao *eventDao) BatchInsertIgnoreConflict(ctx context.Context, events []*en } return inserteds, rows.Err() } + + +type EventQuery struct { + Query +} diff --git a/db/dao/plugin_dao.go b/db/dao/plugin_dao.go index 2ed067d1..28aa7d0b 100644 --- a/db/dao/plugin_dao.go +++ b/db/dao/plugin_dao.go @@ -21,3 +21,29 @@ func NewPluginDAO(db *sqlx.DB, fns ...OptionFunc) PluginDAO { DAO: NewDAO[entities.Plugin](db, opts, fns...), } } + +type PluginQuery struct { + Query + + WorkspaceId *string + EndpointId *string + SourceId *string + Enabled *bool +} + +func (q *PluginQuery) ToQuery() *Query { + query := q.clone() + if q.WorkspaceId != nil { + query.Where("ws_id", Equal, *q.WorkspaceId) + } + if q.EndpointId != nil { + query.Where("endpoint_id", Equal, *q.EndpointId) + } + if q.SourceId != nil { + query.Where("source_id", Equal, *q.SourceId) + } + if q.Enabled != nil { + query.Where("enabled", Equal, *q.Enabled) + } + return &query +} diff --git a/db/dao/query.go b/db/dao/query.go new file mode 100644 index 00000000..6fb6b670 --- /dev/null +++ b/db/dao/query.go @@ -0,0 +1,87 @@ +package dao + +// Operator SQL operator +type Operator int + +const ( + // Equal = + Equal Operator = iota + // GreaterThan > + GreaterThan + // GreaterThanOrEqual >= + GreaterThanOrEqual + // LessThan < + LessThan + // LessThanOrEqual <= + LessThanOrEqual + // JsonContain @> + JsonContain +) + +type Condition struct { + Column string + Op Operator + Value interface{} +} + +type Sort = string + +const ( + ASC Sort = "ASC" + DESC Sort = "DESC" +) + +type Order struct { + Column string + Sort Sort +} + +func (o Order) SQL() string { + return o.Column + " " + o.Sort +} + +type Query struct { + Offset int + Limit int + Wheres []Condition + Orders []Order + CursorModel bool +} + +func (q *Query) Where(column string, op Operator, value any) *Query { + q.Wheres = append(q.Wheres, Condition{column, op, value}) + return q +} + +func (q *Query) Order(column string, sort Sort) *Query { + q.Orders = append(q.Orders, Order{column, sort}) + return q +} + +func (q *Query) Page(pageNo, pageSize int) { + if pageNo < 1 { + pageNo = 1 + } + q.Offset = (pageNo - 1) * pageSize + q.Limit = pageSize +} + +func (q Query) clone() Query { + cloned := q + if len(q.Wheres) > 0 { + cloned.Wheres = append([]Condition(nil), q.Wheres...) + } + if len(q.Orders) > 0 { + cloned.Orders = append([]Order(nil), q.Orders...) + } + return cloned +} + +type Cursor = string + +type CursorResult[T any] struct { + Data []T + Total int64 + HasMore bool + Cursor *Cursor +} diff --git a/db/dao/source_dao.go b/db/dao/source_dao.go index 3df38211..dee44014 100644 --- a/db/dao/source_dao.go +++ b/db/dao/source_dao.go @@ -21,3 +21,17 @@ func NewSourceDAO(db *sqlx.DB, fns ...OptionFunc) SourceDAO { DAO: NewDAO[entities.Source](db, opts, fns...), } } + +type SourceQuery struct { + Query + + WorkspaceId *string +} + +func (q *SourceQuery) ToQuery() *Query { + query := q.clone() + if q.WorkspaceId != nil { + query.Where("ws_id", Equal, *q.WorkspaceId) + } + return &query +} diff --git a/db/dao/workspace_dao.go b/db/dao/workspace_dao.go index f8be1117..9ca322d2 100644 --- a/db/dao/workspace_dao.go +++ b/db/dao/workspace_dao.go @@ -31,3 +31,7 @@ func (dao *workspaceDAO) GetDefault(ctx context.Context) (*entities.Workspace, e func (dao *workspaceDAO) GetWorkspace(ctx context.Context, name string) (*entities.Workspace, error) { return dao.selectByField(ctx, "name", name) } + +type WorkspaceQuery struct { + Query +} diff --git a/db/entities/schema.go b/db/entities/schema.go index 4c170cd8..8f9e1798 100644 --- a/db/entities/schema.go +++ b/db/entities/schema.go @@ -4,35 +4,17 @@ import ( "fmt" "github.com/getkin/kin-openapi/openapi3" + "github.com/webhookx-io/webhookx/pkg/openapi" ) type Schema interface { SchemaName() string } -var spec *openapi3.T - func LookupSchema(name string) *openapi3.Schema { - s, err := spec.Components.Schemas.JSONLookup(name) + s, err := openapi.Spec.Components.Schemas.JSONLookup(name) if err != nil { panic(fmt.Errorf("failed to lookup JSON schema %q: %w", name, err)) } return s.(*openapi3.Schema) } - -func LoadOpenAPI(bytes []byte) { - loader := openapi3.NewLoader() - doc, err := loader.LoadFromData(bytes) - if err != nil { - panic(fmt.Errorf("failed to load OpenAPI document: %w", err)) - } - - if err = doc.Validate(loader.Context, - openapi3.EnableSchemaFormatValidation(), - openapi3.DisableSchemaDefaultsValidation(), - ); err != nil { - panic(fmt.Errorf("OpenAPI document validation failed: %w", err)) - } - - spec = doc -} diff --git a/db/query/order.go b/db/query/order.go deleted file mode 100644 index f7dfc1b2..00000000 --- a/db/query/order.go +++ /dev/null @@ -1,17 +0,0 @@ -package query - -type Sort = string - -const ( - ASC Sort = "ASC" - DESC Sort = "DESC" -) - -type Order struct { - Column string - Sort Sort -} - -func (o Order) String() string { - return o.Column + " " + o.Sort -} diff --git a/db/query/query.go b/db/query/query.go deleted file mode 100644 index aaa3829e..00000000 --- a/db/query/query.go +++ /dev/null @@ -1,43 +0,0 @@ -package query - -type Queryer interface { - Offset() int64 - Limit() int64 - WhereMap() map[string]interface{} - Orders() []*Order -} - -type Query struct { - offset int64 - limit int64 - orders []*Order -} - -func (q *Query) Page(pageNo, pageSize uint64) { - if pageNo < 1 { - pageNo = 1 - } - offset := (pageNo - 1) * pageSize - q.offset = int64(offset) - q.limit = int64(int(pageSize)) -} - -func (q *Query) Offset() int64 { - return q.offset -} - -func (q *Query) Limit() int64 { - return q.limit -} - -func (q *Query) WhereMap() map[string]interface{} { - return nil -} - -func (q *Query) Orders() []*Order { - return q.orders -} - -func (q *Query) Order(column string, sort Sort) { - q.orders = append(q.orders, &Order{column, sort}) -} diff --git a/db/query/querys.go b/db/query/querys.go deleted file mode 100644 index 4b9fc447..00000000 --- a/db/query/querys.go +++ /dev/null @@ -1,101 +0,0 @@ -package query - -type EndpointQuery struct { - Query - - Enabled *bool - WorkspaceId *string -} - -func (q *EndpointQuery) WhereMap() map[string]interface{} { - maps := make(map[string]interface{}) - if q.Enabled != nil { - maps["enabled"] = *q.Enabled - } - if q.WorkspaceId != nil { - maps["ws_id"] = *q.WorkspaceId - } - return maps -} - -type EventQuery struct { - Query -} - -func (q EventQuery) WhereMap() map[string]interface{} { - return map[string]interface{}{} -} - -type WorkspaceQuery struct { - Query -} - -func (q *WorkspaceQuery) WhereMap() map[string]interface{} { - return map[string]interface{}{} -} - -type AttemptQuery struct { - Query - - IDs []string - EventId *string - EndpointId *string - Status *string -} - -func (q *AttemptQuery) WhereMap() map[string]interface{} { - maps := make(map[string]interface{}) - if q.IDs != nil { - maps["id"] = q.IDs - } - if q.EventId != nil { - maps["event_id"] = *q.EventId - } - if q.EndpointId != nil { - maps["endpoint_id"] = *q.EndpointId - } - if q.Status != nil { - maps["status"] = *q.Status - } - return maps -} - -type SourceQuery struct { - Query - - WorkspaceId *string -} - -func (q *SourceQuery) WhereMap() map[string]interface{} { - maps := make(map[string]interface{}) - if q.WorkspaceId != nil { - maps["ws_id"] = *q.WorkspaceId - } - return maps -} - -type PluginQuery struct { - Query - - WorkspaceId *string - EndpointId *string - SourceId *string - Enabled *bool -} - -func (q *PluginQuery) WhereMap() map[string]interface{} { - maps := make(map[string]interface{}) - if q.WorkspaceId != nil { - maps["ws_id"] = *q.WorkspaceId - } - if q.EndpointId != nil { - maps["endpoint_id"] = *q.EndpointId - } - if q.SourceId != nil { - maps["source_id"] = *q.SourceId - } - if q.Enabled != nil { - maps["enabled"] = *q.Enabled - } - return maps -} diff --git a/dispatcher/registry.go b/dispatcher/registry.go index b8d45aee..191c76de 100644 --- a/dispatcher/registry.go +++ b/dispatcher/registry.go @@ -6,8 +6,8 @@ import ( "github.com/hashicorp/golang-lru/v2/expirable" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/utils" "golang.org/x/sync/singleflight" ) @@ -26,7 +26,7 @@ func NewRegistry(db *db.DB) *Registry { } func (r *Registry) Warmup() error { - workspaces, err := r.db.Workspaces.List(context.TODO(), &query.WorkspaceQuery{}) + workspaces, err := r.db.Workspaces.List(context.TODO(), &dao.Query{}) if err != nil { return err } @@ -43,10 +43,10 @@ func (r *Registry) Warmup() error { } func (r *Registry) load(ctx context.Context, wid string) (*Registration, error) { - var q query.EndpointQuery + var q dao.EndpointQuery q.WorkspaceId = &wid q.Enabled = utils.Pointer(true) - endpoints, err := r.db.Endpoints.List(ctx, &q) + endpoints, err := r.db.Endpoints.List(ctx, q.ToQuery()) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 2d6f59d8..27e1cfe6 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/elazarl/goproxy v1.8.1 github.com/getkin/kin-openapi v0.133.0 github.com/go-kit/kit v0.13.0 + github.com/go-playground/form v3.1.4+incompatible github.com/go-playground/validator/v10 v10.30.1 github.com/go-redis/redis_rate/v10 v10.0.1 github.com/go-redsync/redsync/v4 v4.15.0 @@ -154,4 +155,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/go-playground/assert.v1 v1.2.1 // indirect ) diff --git a/go.sum b/go.sum index 53e0ce12..860957d1 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/form v3.1.4+incompatible h1:lvKiHVxE2WvzDIoyMnWcjyiBxKt2+uFJyZcPYWsLnjI= +github.com/go-playground/form v3.1.4+incompatible/go.mod h1:lhcKXfTuhRtIZCIKUeJ0b5F207aeQCPbZU09ScKjwWg= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -400,6 +402,8 @@ google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/openapi.yml b/openapi.yml index ae526325..8f63a78d 100644 --- a/openapi.yml +++ b/openapi.yml @@ -38,7 +38,7 @@ paths: /license: get: - summary: doge + summary: Get current license information responses: "200": description: OK @@ -77,6 +77,13 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/name" + - $ref: "#/components/parameters/created_at" + - $ref: "#/components/parameters/metadata" summary: Page workspaces tags: - Workspace @@ -173,6 +180,14 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/name" + - $ref: "#/components/parameters/enabled" + - $ref: "#/components/parameters/created_at" + - $ref: "#/components/parameters/metadata" summary: Page endpoints tags: - Endpoint @@ -273,15 +288,49 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" - - in: query + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/created_at" + - description: "event_id filter" + in: query name: event_id schema: type: string - - in: query + - description: "endpoint_id filter" + in: query name: endpoint_id schema: type: string - + - description: "status filter" + in: query + name: status + schema: + type: string + - description: "attempted_at filter (unix time in milliseconds)." + in: query + name: attempted_at + explode: true + style: deepObject + schema: + anyOf: + - type: object + properties: + gt: + type: integer + format: int64 + gte: + type: integer + format: int64 + lt: + type: integer + format: int64 + lte: + type: integer + format: int64 + - type: integer + format: int64 summary: Page webhook attempts tags: - Attempt @@ -330,6 +379,44 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/created_at" + - description: "event_type filter" + in: query + name: event_type + schema: + type: string + - description: "unique_id filter" + in: query + name: unique_id + schema: + type: string + - description: "ingested_at filter (unix time in milliseconds)." + in: query + name: ingested_at + explode: true + style: deepObject + schema: + anyOf: + - type: object + properties: + gt: + type: integer + format: int64 + gte: + type: integer + format: int64 + lt: + type: integer + format: int64 + lte: + type: integer + format: int64 + - type: integer + format: int64 summary: Page events tags: - Event @@ -419,6 +506,14 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/name" + - $ref: "#/components/parameters/enabled" + - $ref: "#/components/parameters/created_at" + - $ref: "#/components/parameters/metadata" summary: Page sources tags: - Source @@ -518,6 +613,24 @@ paths: parameters: - $ref: "#/components/parameters/page_no" - $ref: "#/components/parameters/page_size" + - $ref: "#/components/parameters/limit" + - $ref: "#/components/parameters/sort" + - $ref: "#/components/parameters/after" + - $ref: "#/components/parameters/before" + - $ref: "#/components/parameters/name" + - $ref: "#/components/parameters/enabled" + - $ref: "#/components/parameters/created_at" + - $ref: "#/components/parameters/metadata" + - description: "endpoint_id filter" + in: query + name: endpoint_id + schema: + type: string + - description: "source_id filter" + in: query + name: source_id + schema: + type: string summary: Page plugins tags: - Plugin @@ -683,17 +796,99 @@ components: example: "default" description: The workspace id page_no: + deprecated: true in: query name: page_no schema: type: integer default: 1 + minimum: 1 page_size: + deprecated: true in: query name: page_size schema: type: integer default: 20 + maximum: 1000 + minimum: 1 + limit: + description: "A limit on the number of objects to be returned. Limit can range between 1 and 1000, and the default is 20." + in: query + name: limit + schema: + type: integer + default: 20 + maximum: 1000 + minimum: 1 + sort: + description: "" + in: query + name: sort + schema: + type: string + enum: [ "id.desc", "id.asc" ] + default: "id.desc" + after: + description: "A cursor for use in pagination." + in: query + name: after + schema: + type: string + before: + description: "A cursor for use in pagination." + in: query + name: before + schema: + type: string + metadata: + description: "" + in: query + name: metadata + explode: true + style: deepObject + schema: + type: object + additionalProperties: + type: string + example: + key: value + name: + description: "name filter" + in: query + name: name + schema: + type: string + enabled: + description: "enabled filter" + in: query + name: enabled + schema: + type: boolean + created_at: + description: "created_at filter (unix time in milliseconds)." + in: query + name: created_at + explode: true + style: deepObject + schema: + anyOf: + - type: object + properties: + gt: + type: integer + format: int64 + gte: + type: integer + format: int64 + lt: + type: integer + format: int64 + lte: + type: integer + format: int64 + - type: integer + format: int64 responses: NotFound: description: The resource was not found @@ -707,6 +902,10 @@ components: total: type: integer example: 1 + next: + description: "" + type: string + nullable: true Metadata: type: object diff --git a/pkg/declarative/declarative.go b/pkg/declarative/declarative.go index cd82a195..2cf16b20 100644 --- a/pkg/declarative/declarative.go +++ b/pkg/declarative/declarative.go @@ -4,7 +4,7 @@ import ( "context" "github.com/webhookx-io/webhookx/db" - "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/utils" ) @@ -72,9 +72,9 @@ func (m *Declarative) Sync(wid string, cfg *Configuration) error { } // cleanup existing endpoint - var endpointQ query.EndpointQuery + var endpointQ dao.EndpointQuery endpointQ.WorkspaceId = &wid - endpoints, err := m.db.Endpoints.List(ctx, &endpointQ) + endpoints, err := m.db.Endpoints.List(ctx, endpointQ.ToQuery()) if err != nil { return err } @@ -88,9 +88,9 @@ func (m *Declarative) Sync(wid string, cfg *Configuration) error { } // cleanup existing source - var sourceQ query.SourceQuery + var sourceQ dao.SourceQuery sourceQ.WorkspaceId = &wid - sources, err := m.db.Sources.List(ctx, &sourceQ) + sources, err := m.db.Sources.List(ctx, sourceQ.ToQuery()) if err != nil { return err } @@ -110,19 +110,19 @@ func (m *Declarative) Dump(ctx context.Context, wid string) (*Configuration, err var cfg Configuration err := m.db.TX(ctx, func(ctx context.Context) error { - var endpointQ query.EndpointQuery + var endpointQ dao.EndpointQuery endpointQ.WorkspaceId = &wid - endpoints, err := m.db.Endpoints.List(ctx, &endpointQ) + endpoints, err := m.db.Endpoints.List(ctx, endpointQ.ToQuery()) if err != nil { return err } for _, endpoint := range endpoints { var e Endpoint e.Endpoint = *endpoint - var q query.PluginQuery + var q dao.PluginQuery q.EndpointId = &endpoint.ID q.WorkspaceId = &endpoint.WorkspaceId - plugins, err := m.db.Plugins.List(ctx, &q) + plugins, err := m.db.Plugins.List(ctx, q.ToQuery()) if err != nil { return err } @@ -130,19 +130,19 @@ func (m *Declarative) Dump(ctx context.Context, wid string) (*Configuration, err cfg.Endpoints = append(cfg.Endpoints, &e) } - var sourceQ query.SourceQuery + var sourceQ dao.SourceQuery sourceQ.WorkspaceId = &wid - sources, err := m.db.Sources.List(ctx, &sourceQ) + sources, err := m.db.Sources.List(ctx, sourceQ.ToQuery()) if err != nil { return err } for _, source := range sources { var e Source e.Source = *source - var q query.PluginQuery + var q dao.PluginQuery q.SourceId = &source.ID q.WorkspaceId = &source.WorkspaceId - plugins, err := m.db.Plugins.List(ctx, &q) + plugins, err := m.db.Plugins.List(ctx, q.ToQuery()) if err != nil { return err } diff --git a/pkg/openapi/openapi.go b/pkg/openapi/openapi.go index cd4abb63..ca85f1fc 100644 --- a/pkg/openapi/openapi.go +++ b/pkg/openapi/openapi.go @@ -1,19 +1,26 @@ package openapi import ( + "context" "encoding/json" "errors" "fmt" "maps" + "net/http" "regexp" "slices" "strconv" "github.com/getkin/kin-openapi/openapi3" + "github.com/getkin/kin-openapi/openapi3filter" "github.com/tidwall/gjson" "github.com/webhookx-io/webhookx/pkg/errs" ) +var ( + Spec *openapi3.T +) + type FormatValidatorFunc[T any] func(T) error func (fn FormatValidatorFunc[T]) Validate(value T) error { return fn(value) } @@ -27,6 +34,32 @@ func init() { })) } +func ParseSpec(data []byte) (*openapi3.T, error) { + loader := openapi3.NewLoader() + doc, err := loader.LoadFromData(data) + if err != nil { + return nil, fmt.Errorf("failed to load OpenAPI document: %w", err) + } + + err = doc.Validate( + loader.Context, + openapi3.EnableSchemaFormatValidation(), + openapi3.DisableSchemaDefaultsValidation(), + ) + if err != nil { + return nil, fmt.Errorf("OpenAPI document validation failed: %w", err) + } + return doc, nil +} + +func LoadOpenAPI(data []byte) { + doc, err := ParseSpec(data) + if err != nil { + panic(err) + } + Spec = doc +} + func SetDefaults(schema *openapi3.Schema, defaults map[string]interface{}) error { data := make(map[string]interface{}) _ = schema.VisitJSON(data, @@ -69,6 +102,32 @@ func Validate(schema *openapi3.Schema, value map[string]interface{}) error { return nil } +func ValidateParameters(r *http.Request, parameters openapi3.Parameters) error { + options := openapi3filter.Options{ + MultiError: true, // ? + SkipSettingDefaults: true, + } + options.WithCustomSchemaErrorFunc(formatError) + input := &openapi3filter.RequestValidationInput{ + Request: r, + Options: &options, + } + + var me openapi3.MultiError + for _, param := range parameters { + if err := openapi3filter.ValidateParameter(context.TODO(), input, param.Value); err != nil { + me = append(me, err) + } + } + + if len(me) > 0 { + validateErr := errs.NewValidateError(errs.ErrRequestValidation) + handleMultiError(me, nil, validateErr.Fields) + return validateErr + } + return nil +} + func decodeMultiError(err error) openapi3.MultiError { if unwrapped := errors.Unwrap(err); unwrapped != nil { if me, ok := unwrapped.(openapi3.MultiError); ok { @@ -103,6 +162,26 @@ func handleMultiError(me openapi3.MultiError, paths []string, fields map[string] if decoded := decodeMultiError(e); decoded != nil { handleMultiError(decoded, e.JSONPointer(), fields) } + case *openapi3filter.RequestError: + + const params = "@params" + var unknowns []string + if v, ok := fields[params]; !ok { + unknowns = make([]string, 0) + } else { + unknowns = v.([]string) + } + var msg string + + switch e.Err { + case openapi3filter.ErrInvalidRequired: + msg = fmt.Sprintf("%s: %s", e.Parameter.Name, "is required") + case openapi3filter.ErrInvalidEmptyValue: + msg = fmt.Sprintf("%s: %s", e.Parameter.Name, "is empty") + default: + msg = fmt.Sprintf("%s: %s", e.Parameter.Name, e.Err.Error()) + } + fields[params] = append(unknowns, msg) default: const unknown = "@unknown" var unknowns []string diff --git a/proxy/gateway.go b/proxy/gateway.go index bccc6a4f..256744fc 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -20,7 +20,6 @@ import ( "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/pkg/contextx" "github.com/webhookx-io/webhookx/pkg/http/middlewares" @@ -163,7 +162,7 @@ func (g *Gateway) buildRouter(version string) { g.log.Debugw("building router", "version", version) - sources, err := g.db.Sources.List(ctx, &query.SourceQuery{}) + sources, err := g.db.Sources.List(ctx, &dao.Query{}) if err != nil { g.log.Warnf("failed to build router: %v", err) return diff --git a/test/admin/endpoints_test.go b/test/admin/endpoints_test.go index 94637228..d2567085 100644 --- a/test/admin/endpoints_test.go +++ b/test/admin/endpoints_test.go @@ -3,6 +3,7 @@ package admin import ( "context" "fmt" + "strconv" "time" "github.com/go-resty/resty/v2" @@ -217,23 +218,29 @@ var _ = Describe("/endpoints", Ordered, func() { }) Context("GET", func() { - Context("with data", func() { - BeforeAll(func() { - assert.Nil(GinkgoT(), db.Truncate("endpoints")) - for i := 1; i <= 21; i++ { - entity := entities.Endpoint{ - ID: utils.KSUID(), - Enabled: true, - Request: entities.RequestConfig{ - URL: "https://example.com", - Method: "POST", - }, - } - entity.WorkspaceId = ws.ID - assert.Nil(GinkgoT(), db.Endpoints.Insert(context.TODO(), &entity)) + BeforeAll(func() { + assert.Nil(GinkgoT(), db.Truncate("endpoints")) + for i := 1; i <= 21; i++ { + entity := entities.Endpoint{ + ID: fmt.Sprintf("ep_%03d", i), + Name: utils.Pointer(fmt.Sprintf("endpoint_%03d", i)), + Enabled: i%2 == 0, + Request: entities.RequestConfig{ + URL: "https://example.com", + Method: "POST", + }, + Metadata: map[string]string{ + "foo": "bar", + "value": strconv.Itoa(i), + }, } - }) - It("retrieves first page", func() { + entity.WorkspaceId = ws.ID + assert.Nil(GinkgoT(), db.Endpoints.Insert(context.TODO(), &entity)) + } + }) + + Context("offset pagination", func() { + It("default", func() { resp, err := adminClient.R(). SetResult(api.Pagination[*entities.Endpoint]{}). Get("/workspaces/default/endpoints") @@ -242,25 +249,198 @@ var _ = Describe("/endpoints", Ordered, func() { assert.EqualValues(GinkgoT(), 21, result.Total) assert.EqualValues(GinkgoT(), 20, len(result.Data)) }) - It("retrieves second page", func() { + + It("page_no", func() { resp, err := adminClient.R(). SetResult(api.Pagination[*entities.Endpoint]{}). - Get("/workspaces/default/endpoints?page_no=2") + SetQueryParam("page_no", "2"). + Get("/workspaces/default/endpoints") assert.Nil(GinkgoT(), err) result := resp.Result().(*api.Pagination[*entities.Endpoint]) assert.EqualValues(GinkgoT(), 21, result.Total) assert.EqualValues(GinkgoT(), 1, len(result.Data)) }) + + It("page_size", func() { + resp, err := adminClient.R(). + SetResult(api.Pagination[*entities.Endpoint]{}). + SetQueryParam("page_size", "21"). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.EqualValues(GinkgoT(), 21, result.Total) + assert.EqualValues(GinkgoT(), 21, len(result.Data)) + }) }) - Context("with no data", func() { - BeforeAll(func() { - assert.Nil(GinkgoT(), db.Truncate("endpoints")) + Context("cursor pagination", func() { + It("limit", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "5"). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + assert.Equal(GinkgoT(), 5, len(result.Data)) + assert.NotNil(GinkgoT(), result.Next) + + resp, err = adminClient.R(). + SetQueryParam("limit", "100"). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + assert.Equal(GinkgoT(), 21, len(result.Data)) + assert.Nil(GinkgoT(), result.Next) + }) + + It("after", func() { + // Get first 2 + resp, err := adminClient.R(). + SetQueryParam("limit", "2"). + SetQueryParam("sort", "id.asc"). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + assert.Equal(GinkgoT(), 2, len(result.Data)) + firstId := result.Data[0].ID + secondId := result.Data[1].ID + + // Get after first + resp, err = adminClient.R(). + SetQueryParam("limit", "1"). + SetQueryParam("sort", "id.asc"). + SetQueryParam("after", firstId). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), secondId, result.Data[0].ID) + }) + + It("before", func() { + // Get first 2 + resp, err := adminClient.R(). + SetQueryParam("limit", "2"). + SetQueryParam("sort", "id.asc"). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + firstId := result.Data[0].ID + secondId := result.Data[1].ID + + // Get before second + resp, err = adminClient.R(). + SetQueryParam("limit", "1"). + SetQueryParam("sort", "id.asc"). + SetQueryParam("before", secondId). + SetResult(api.PaginationCursor[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.PaginationCursor[*entities.Endpoint]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), firstId, result.Data[0].ID) + }) + + Context("errors", func() { + It("limit must be integer", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "test"). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: value test: an invalid integer: invalid syntax"]}}}`, + string(resp.Body())) + + }) + It("limit must in range [1, 1000]", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "0"). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: number must be at least 1"]}}}`, + string(resp.Body())) + + resp, err = adminClient.R(). + SetQueryParam("limit", "1001"). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: number must be at most 1000"]}}}`, + string(resp.Body())) + }) + }) + }) + + Context("query parameters", func() { + It("name", func() { + resp, err := adminClient.R(). + SetQueryParam("name", "endpoint_005"). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), "endpoint_005", *result.Data[0].Name) + }) + + It("enabled", func() { + resp, err := adminClient.R(). + SetQueryParam("enabled", "true"). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.True(GinkgoT(), len(result.Data) > 0) + for _, ep := range result.Data { + assert.True(GinkgoT(), ep.Enabled) + } + }) + + It("created_at", func() { + resp, err := adminClient.R(). + SetQueryParam("name", "endpoint_005"). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + ep5 := resp.Result().(*api.Pagination[*entities.Endpoint]).Data[0] + + resp, err = adminClient.R(). + SetQueryParam("created_at[lt]", fmt.Sprintf("%d", ep5.CreatedAt.UnixMilli()+1)). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.True(GinkgoT(), len(result.Data) >= 1) + + resp, err = adminClient.R(). + SetQueryParam("created_at[gte]", fmt.Sprintf("%d", ep5.CreatedAt.UnixMilli())). + SetQueryParam("created_at[lte]", fmt.Sprintf("%d", ep5.CreatedAt.UnixMilli())). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.True(GinkgoT(), len(result.Data) >= 1) }) - It("retrieves first page", func() { - resp, err := adminClient.R().Get("/workspaces/default/endpoints") + + It("metadata", func() { + resp, err := adminClient.R(). + SetQueryParam("metadata[value]", "5"). + SetResult(api.Pagination[*entities.Endpoint]{}). + Get("/workspaces/default/endpoints") assert.Nil(GinkgoT(), err) - assert.Equal(GinkgoT(), `{"total":0,"data":[]}`, string(resp.Body())) + result := resp.Result().(*api.Pagination[*entities.Endpoint]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), "endpoint_005", *result.Data[0].Name) + assert.EqualValues(GinkgoT(), entities.Metadata{ + "foo": "bar", + "value": "5", + }, result.Data[0].Metadata) + }) }) }) diff --git a/test/admin/events_test.go b/test/admin/events_test.go index 36ea4371..6e0dd46f 100644 --- a/test/admin/events_test.go +++ b/test/admin/events_test.go @@ -10,8 +10,8 @@ import ( "github.com/webhookx-io/webhookx/admin/api" "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" @@ -185,11 +185,11 @@ var _ = Describe("/events", Ordered, func() { assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 200, resp.StatusCode()) - q := query.AttemptQuery{ + q := dao.AttemptQuery{ EventId: &eventId, EndpointId: &endpointId, } - attempts, err := db.Attempts.List(context.TODO(), &q) + attempts, err := db.Attempts.List(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 1, len(attempts)) assert.Equal(GinkgoT(), entities.AttemptTriggerModeManual, attempts[0].TriggerMode) diff --git a/test/admin/sources_test.go b/test/admin/sources_test.go index 29bd55c3..e59f678a 100644 --- a/test/admin/sources_test.go +++ b/test/admin/sources_test.go @@ -2,6 +2,8 @@ package admin import ( "context" + "fmt" + "strconv" "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" @@ -75,19 +77,25 @@ var _ = Describe("/sources", Ordered, func() { }) Context("GET", func() { - Context("with data", func() { - BeforeAll(func() { - assert.Nil(GinkgoT(), db.Truncate("sources")) - for i := 1; i <= 21; i++ { - entity := entities.Source{ - ID: utils.KSUID(), - Enabled: true, - } - entity.WorkspaceId = ws.ID - assert.Nil(GinkgoT(), db.Sources.Insert(context.TODO(), &entity)) + BeforeAll(func() { + assert.Nil(GinkgoT(), db.Truncate("sources")) + for i := 1; i <= 21; i++ { + entity := entities.Source{ + ID: fmt.Sprintf("%03d", i), + Name: utils.Pointer(fmt.Sprintf("%03d", i)), + Enabled: i%2 == 0, + Metadata: map[string]string{ + "foo": "bar", + "value": strconv.Itoa(i), + }, } - }) - It("retrieves first page", func() { + entity.WorkspaceId = ws.ID + assert.Nil(GinkgoT(), db.Sources.Insert(context.TODO(), &entity)) + } + }) + + Context("offset pagination", func() { + It("default", func() { resp, err := adminClient.R(). SetResult(api.Pagination[*entities.Source]{}). Get("/workspaces/default/sources") @@ -96,25 +104,189 @@ var _ = Describe("/sources", Ordered, func() { assert.EqualValues(GinkgoT(), 21, result.Total) assert.EqualValues(GinkgoT(), 20, len(result.Data)) }) - It("retrieves second page", func() { + + It("page_no", func() { resp, err := adminClient.R(). SetResult(api.Pagination[*entities.Source]{}). - Get("/workspaces/default/sources?page_no=2") + SetQueryParam("page_no", "2"). + Get("/workspaces/default/sources") assert.Nil(GinkgoT(), err) result := resp.Result().(*api.Pagination[*entities.Source]) assert.EqualValues(GinkgoT(), 21, result.Total) assert.EqualValues(GinkgoT(), 1, len(result.Data)) }) + + It("page_size", func() { + resp, err := adminClient.R(). + SetResult(api.Pagination[*entities.Source]{}). + SetQueryParam("page_size", "21"). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Source]) + assert.EqualValues(GinkgoT(), 21, result.Total) + assert.EqualValues(GinkgoT(), 21, len(result.Data)) + }) }) - Context("with no data", func() { - BeforeAll(func() { - assert.Nil(GinkgoT(), db.Truncate("sources")) + Context("cursor pagination", func() { + It("limit", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "5"). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Source]) + assert.Equal(GinkgoT(), 5, len(result.Data)) + assert.NotNil(GinkgoT(), result.Next) + + resp, err = adminClient.R(). + SetQueryParam("limit", "100"). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.PaginationCursor[*entities.Source]) + assert.Equal(GinkgoT(), 21, len(result.Data)) + assert.Nil(GinkgoT(), result.Next) + }) + + It("after", func() { + // Get first 2 + resp, err := adminClient.R(). + SetQueryParam("limit", "2"). + SetQueryParam("sort", "id.asc"). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Source]) + assert.Equal(GinkgoT(), 2, len(result.Data)) + firstId := result.Data[0].ID + secondId := result.Data[1].ID + + // Get after first + resp, err = adminClient.R(). + SetQueryParam("limit", "1"). + SetQueryParam("sort", "id.asc"). + SetQueryParam("after", firstId). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result = resp.Result().(*api.PaginationCursor[*entities.Source]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), secondId, result.Data[0].ID) }) - It("retrieves first page", func() { - resp, err := adminClient.R().Get("/workspaces/default/sources") + + It("before", func() { + // Get first 2 + resp, err := adminClient.R(). + SetQueryParam("limit", "2"). + SetQueryParam("sort", "id.asc"). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.PaginationCursor[*entities.Source]) + firstId := result.Data[0].ID + secondId := result.Data[1].ID + + // Get before second + resp, err = adminClient.R(). + SetQueryParam("limit", "1"). + SetQueryParam("sort", "id.asc"). + SetQueryParam("before", secondId). + SetResult(api.PaginationCursor[*entities.Source]{}). + Get("/workspaces/default/sources") assert.Nil(GinkgoT(), err) - assert.Equal(GinkgoT(), `{"total":0,"data":[]}`, string(resp.Body())) + result = resp.Result().(*api.PaginationCursor[*entities.Source]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), firstId, result.Data[0].ID) + }) + + Context("errors", func() { + It("limit must be integer", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "test"). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: value test: an invalid integer: invalid syntax"]}}}`, + string(resp.Body())) + + }) + It("limit must in range [1, 1000]", func() { + resp, err := adminClient.R(). + SetQueryParam("limit", "0"). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: number must be at least 1"]}}}`, + string(resp.Body())) + + resp, err = adminClient.R(). + SetQueryParam("limit", "1001"). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"@params":["limit: number must be at most 1000"]}}}`, + string(resp.Body())) + }) + }) + }) + + Context("query parameters", func() { + It("name", func() { + resp, err := adminClient.R(). + SetQueryParam("name", "005"). + SetResult(api.Pagination[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Source]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), "005", *result.Data[0].Name) + }) + + It("enabled", func() { + resp, err := adminClient.R(). + SetQueryParam("enabled", "true"). + SetResult(api.Pagination[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Source]) + assert.True(GinkgoT(), len(result.Data) > 0) + for _, ep := range result.Data { + assert.True(GinkgoT(), ep.Enabled) + } + }) + + It("created_at", func() { + resp, err := adminClient.R(). + SetQueryParam("name", "005"). + SetResult(api.Pagination[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + ep5 := resp.Result().(*api.Pagination[*entities.Source]).Data[0] + + resp, err = adminClient.R(). + SetQueryParam("created_at[lt]", fmt.Sprintf("%d", ep5.CreatedAt.UnixMilli()+1)). + SetResult(api.Pagination[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Source]) + assert.True(GinkgoT(), len(result.Data) >= 1) + }) + + It("metadata", func() { + resp, err := adminClient.R(). + SetQueryParam("metadata[value]", "5"). + SetResult(api.Pagination[*entities.Source]{}). + Get("/workspaces/default/sources") + assert.Nil(GinkgoT(), err) + result := resp.Result().(*api.Pagination[*entities.Source]) + assert.Equal(GinkgoT(), 1, len(result.Data)) + assert.Equal(GinkgoT(), "005", *result.Data[0].Name) + assert.EqualValues(GinkgoT(), entities.Metadata{ + "foo": "bar", + "value": "5", + }, result.Data[0].Metadata) + }) }) }) diff --git a/test/clustering/clustering_test.go b/test/clustering/clustering_test.go index ed9a949a..6b1b8e94 100644 --- a/test/clustering/clustering_test.go +++ b/test/clustering/clustering_test.go @@ -11,8 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" "github.com/webhookx-io/webhookx/utils" @@ -92,11 +92,11 @@ var _ = Describe("clustering", Ordered, func() { } assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{ + q := dao.AttemptQuery{ Status: utils.Pointer(entities.AttemptStatusSuccess), } - n, err := db.Attempts.Count(context.TODO(), q.WhereMap()) + n, err := db.Attempts.Count(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) return n == 100 }, time.Second*3, time.Second) @@ -114,7 +114,7 @@ var _ = Describe("clustering", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) + list, err := db.Attempts.List(context.TODO(), &dao.Query{}) if err != nil || len(list) == 0 { return false } diff --git a/test/cmd/admin_test.go b/test/cmd/admin_test.go index da710d68..0b67e4d4 100644 --- a/test/cmd/admin_test.go +++ b/test/cmd/admin_test.go @@ -14,8 +14,8 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/cmd" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" "github.com/webhookx-io/webhookx/utils" @@ -95,20 +95,20 @@ var _ = Describe("admin", Ordered, func() { assert.Equal(GinkgoT(), "application/json", source.Config.HTTP.Response.ContentType) assert.Equal(GinkgoT(), `{"message": "OK"}`, source.Config.HTTP.Response.Body) - q := query.PluginQuery{} + q := dao.PluginQuery{} q.EndpointId = &endpoint.ID q.Enabled = utils.Pointer(true) - plugins, err := db.Plugins.List(context.TODO(), &q) + plugins, err := db.Plugins.List(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 1, len(plugins)) assert.Equal(GinkgoT(), "webhookx-signature", plugins[0].Name) assert.Equal(GinkgoT(), true, plugins[0].Enabled) assert.EqualValues(GinkgoT(), map[string]interface{}{"signing_secret": "foo"}, plugins[0].Config) - q = query.PluginQuery{} + q = dao.PluginQuery{} q.SourceId = &source.ID q.Enabled = utils.Pointer(true) - plugins, err = db.Plugins.List(context.TODO(), &q) + plugins, err = db.Plugins.List(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 2, len(plugins)) names := make(map[string]*entities.Plugin) diff --git a/test/delivery/acl_test.go b/test/delivery/acl_test.go index bb21c701..338d808a 100644 --- a/test/delivery/acl_test.go +++ b/test/delivery/acl_test.go @@ -11,8 +11,8 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" "github.com/webhookx-io/webhookx/worker/deliverer" @@ -88,9 +88,9 @@ var _ = Describe("network acl", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -122,9 +122,9 @@ var _ = Describe("network acl", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -148,9 +148,9 @@ var _ = Describe("network acl", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -174,9 +174,9 @@ var _ = Describe("network acl", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } diff --git a/test/delivery/delivery_test.go b/test/delivery/delivery_test.go index 448ea2cb..c288b520 100644 --- a/test/delivery/delivery_test.go +++ b/test/delivery/delivery_test.go @@ -8,6 +8,7 @@ import ( "github.com/webhookx-io/webhookx" "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/test/helper/factory" "github.com/go-resty/resty/v2" @@ -17,7 +18,6 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/utils" ) @@ -70,9 +70,9 @@ var _ = Describe("delivery", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -149,9 +149,9 @@ var _ = Describe("delivery", Ordered, func() { time.Sleep(time.Second * 4) - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - attempts, err := db.Attempts.List(context.TODO(), &q) + attempts, err := db.Attempts.List(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 3, len(attempts)) for i, e := range attempts { @@ -220,7 +220,7 @@ var _ = Describe("delivery", Ordered, func() { time.Sleep(time.Second * 10) - attempts, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) + attempts, err := db.Attempts.List(context.TODO(), &dao.Query{}) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 1, len(attempts)) attempt := attempts[0] @@ -272,9 +272,9 @@ var _ = Describe("delivery", Ordered, func() { assert.Equal(GinkgoT(), 200, resp.StatusCode()) eventId := resp.Header().Get(constants.HeaderEventId) - query := query.AttemptQuery{} + query := dao.AttemptQuery{} query.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &query) + list, err := db.Attempts.List(context.TODO(), query.ToQuery()) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 1, len(list)) assert.Equal(GinkgoT(), entities.AttemptStatusInit, list[0].Status) // should not be enqueued @@ -346,10 +346,10 @@ var _ = Describe("delivery", Ordered, func() { // wait for attempt to be retried after rate limiting is reset time.Sleep(time.Second * time.Duration(period) * 2) - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EndpointId = &entitiesConfig.Endpoints[0].ID q.Status = utils.Pointer(entities.AttemptStatusSuccess) - count, err := db.Attempts.Count(context.TODO(), q.WhereMap()) + count, err := db.Attempts.Count(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 4, count) @@ -388,7 +388,7 @@ var _ = Describe("delivery", Ordered, func() { assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 200, resp.StatusCode()) } - n, err := db.Events.Count(context.TODO(), nil) + n, err := db.Events.Count(context.TODO(), &dao.Query{}) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 1, n) }) diff --git a/test/delivery/http_proxy_test.go b/test/delivery/http_proxy_test.go index 615bd02a..baef01ea 100644 --- a/test/delivery/http_proxy_test.go +++ b/test/delivery/http_proxy_test.go @@ -18,8 +18,8 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" @@ -155,9 +155,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -200,9 +200,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -243,9 +243,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -309,9 +309,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -353,9 +353,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -438,9 +438,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } @@ -482,9 +482,9 @@ var _ = Describe("Proxy", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } diff --git a/test/plugins/function_test.go b/test/plugins/function_test.go index 65377bac..0a831b8b 100644 --- a/test/plugins/function_test.go +++ b/test/plugins/function_test.go @@ -5,7 +5,7 @@ import ( "time" "github.com/webhookx-io/webhookx/db" - "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/plugins/function/sdk" "github.com/webhookx-io/webhookx/test/helper/factory" @@ -81,7 +81,7 @@ var _ = Describe("function", Ordered, func() { // payload should be changed var event *entities.Event assert.Eventually(GinkgoT(), func() bool { - list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + list, err := db.Events.List(context.TODO(), &dao.Query{}) if err != nil || len(list) != 1 { return false } diff --git a/test/plugins/wasm_test.go b/test/plugins/wasm_test.go index 69dd1cbc..c13e2578 100644 --- a/test/plugins/wasm_test.go +++ b/test/plugins/wasm_test.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/plugins/wasm" "github.com/webhookx-io/webhookx/test" "github.com/webhookx-io/webhookx/test/helper/factory" @@ -14,7 +15,6 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/utils" ) @@ -63,7 +63,7 @@ var _ = Describe("wasm", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) + list, err := db.Attempts.List(context.TODO(), &dao.Query{}) if err != nil || len(list) == 0 { return false } diff --git a/test/plugins/webhookx_signature_test.go b/test/plugins/webhookx_signature_test.go index ac9f5243..bf225ad7 100644 --- a/test/plugins/webhookx_signature_test.go +++ b/test/plugins/webhookx_signature_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/webhookx-io/webhookx" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/plugins/webhookx_signature" "github.com/webhookx-io/webhookx/test/helper/factory" @@ -15,7 +16,6 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/utils" ) @@ -71,7 +71,7 @@ var _ = Describe("webhookx-signature", Ordered, func() { var event *entities.Event assert.Eventually(GinkgoT(), func() bool { - list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + list, err := db.Events.List(context.TODO(), &dao.Query{}) if err != nil || len(list) != 1 { return false } @@ -82,7 +82,7 @@ var _ = Describe("webhookx-signature", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) + list, err := db.Attempts.List(context.TODO(), &dao.Query{}) if err != nil || len(list) == 0 { return false } diff --git a/test/proxy/ingest_test.go b/test/proxy/ingest_test.go index fe205a90..c0e7f0bb 100644 --- a/test/proxy/ingest_test.go +++ b/test/proxy/ingest_test.go @@ -12,8 +12,8 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" "github.com/webhookx-io/webhookx/utils" @@ -73,9 +73,9 @@ var _ = Describe("ingest", Ordered, func() { var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } diff --git a/test/schema/schema_test.go b/test/schema/schema_test.go index ddcd3160..43c5dc14 100644 --- a/test/schema/schema_test.go +++ b/test/schema/schema_test.go @@ -19,7 +19,7 @@ var _ = Describe("schemas", Ordered, func() { Context("Endpoint", func() { var schema *openapi3.Schema BeforeAll(func() { - entities.LoadOpenAPI(webhookx.OpenAPI) + openapi.LoadOpenAPI(webhookx.OpenAPI) schema = entities.LookupSchema("Endpoint") }) @@ -103,7 +103,7 @@ var _ = Describe("schemas", Ordered, func() { Context("Source", func() { var schema *openapi3.Schema BeforeAll(func() { - entities.LoadOpenAPI(webhookx.OpenAPI) + openapi.LoadOpenAPI(webhookx.OpenAPI) schema = entities.LookupSchema("Source") }) @@ -174,7 +174,7 @@ var _ = Describe("schemas", Ordered, func() { Context("Configuration", func() { var schema *openapi3.Schema BeforeAll(func() { - entities.LoadOpenAPI(webhookx.OpenAPI) + openapi.LoadOpenAPI(webhookx.OpenAPI) schema = entities.LookupSchema("Configuration") }) diff --git a/test/tracing/admin_test.go b/test/tracing/admin_test.go index 904b63de..2fb384d3 100644 --- a/test/tracing/admin_test.go +++ b/test/tracing/admin_test.go @@ -80,7 +80,6 @@ var _ = Describe("tracing admin", Ordered, func() { "admin.endpoints.page": {}, "dao.endpoints.page": {}, "dao.endpoints.count": {}, - "dao.endpoints.list": {}, }) assert.NoError(GinkgoT(), err) diff --git a/test/tracing/worker_test.go b/test/tracing/worker_test.go index e91a59b4..89264955 100644 --- a/test/tracing/worker_test.go +++ b/test/tracing/worker_test.go @@ -10,8 +10,8 @@ import ( "github.com/webhookx-io/webhookx/app" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/test/fixtures/plugins/outbound" @@ -74,9 +74,9 @@ var _ = Describe("tracing worker", Ordered, func() { eventId := resp.Header().Get(constants.HeaderEventId) assert.Eventually(GinkgoT(), func() bool { - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.EventId = &eventId - list, err := db.Attempts.List(context.TODO(), &q) + list, err := db.Attempts.List(context.TODO(), q.ToQuery()) if err != nil || len(list) == 0 { return false } diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go index c5866146..6a7591bd 100644 --- a/test/worker/requeue_test.go +++ b/test/worker/requeue_test.go @@ -10,8 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/webhookx-io/webhookx/config/modules" "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/ratelimiter" "github.com/webhookx-io/webhookx/services" @@ -94,15 +94,15 @@ var _ = Describe("processRequeue", Ordered, func() { It("all attempts should become QUEUED", func() { time.Sleep(time.Second * 1) // wait for timer to be executed - var q query.AttemptQuery + var q dao.AttemptQuery q.EndpointId = utils.Pointer(endpoint.ID) q.Status = utils.Pointer(entities.AttemptStatusInit) - count, err := db.Attempts.Count(context.TODO(), q.WhereMap()) + count, err := db.Attempts.Count(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 0, count) q.Status = utils.Pointer(entities.AttemptStatusQueued) - count, err = db.Attempts.Count(context.TODO(), q.WhereMap()) + count, err = db.Attempts.Count(context.TODO(), q.ToQuery()) assert.NoError(GinkgoT(), err) assert.EqualValues(GinkgoT(), 10, count) }) diff --git a/worker/worker.go b/worker/worker.go index bed6ffeb..cc32f2e3 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -17,7 +17,6 @@ import ( "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/mcache" "github.com/webhookx-io/webhookx/pkg/batchqueue" "github.com/webhookx-io/webhookx/pkg/plugin" @@ -188,10 +187,10 @@ func (w *Worker) registerEventHandler(bus eventbus.EventBus) { } defer func() { _, _ = mux.Unlock() }() - q := query.AttemptQuery{} + q := dao.AttemptQuery{} q.IDs = fanoutData.AttemptIds q.Status = utils.Pointer(entities.AttemptStatusInit) - attempts, err := w.db.Attempts.List(ctx, &q) + attempts, err := w.db.Attempts.List(ctx, q.ToQuery()) if err != nil { w.log.Errorf("failed to list attempts: id=%s err=%s", fanoutData.EventId, err) return