Ensemble Tap is a standalone Go service that ingests SaaS webhook events, normalizes them into CloudEvents, publishes them to NATS JetStream, and optionally persists events into ClickHouse.
- Webhook ingress for Stripe, GitHub, HubSpot, Linear, Shopify, and generic HMAC providers.
- Multi-tenant webhook routing via
POST /webhooks/{provider}andPOST /webhooks/{provider}/{tenant}. - Polling engine with provider pollers for HubSpot, Salesforce, QuickBooks, and Notion.
- Poll-mode supports tenant fan-out from provider tenant overrides with tenant-scoped state tracking.
- Poll config supports per-tenant
poll_interval,poll_rate_limit_per_sec,poll_burst,poll_max_pages,poll_max_requests,poll_failure_budget,poll_circuit_break_duration, andpoll_jitter_ratiooverrides. - Poll fetch telemetry includes request/page counts and truncation metrics for bounded fetch budgets.
- Durable poll state backends (
memoryorsqlite). - CloudEvents normalization and schema validation (
tapversion=v1). - NATS JetStream publisher with dedup IDs and optional tenant-scoped subjects.
- Ingress request IDs propagate through CloudEvents (
taprequestid/request_id), NATS headers (X-Request-ID), and DLQ records. - Optional ClickHouse sink consuming from NATS with batched inserts.
- Dead-letter queue recording for verification/normalization/publish failures.
- Admin DLQ replay endpoints:
GET /admin/replay-dlqlists replay jobs with optionalstatusandlimitfilters.POST /admin/replay-dlq?limit=100creates async replay jobs (withdry_runand idempotency support).GET /admin/replay-dlq/{job_id}fetches replay job status/results.DELETE /admin/replay-dlq/{job_id}cancels queued replay jobs.
- Admin poller runtime status endpoint:
GET /admin/poller-statusguarded byX-Admin-Token, with optionalproviderandtenantfilters. - Health and observability endpoints:
GET /livezGET /readyzGET /metrics
- Create config:
cp config.example.yaml config.yamlConfigure provider credentials (or remove providers you are not enabling) before startup; runtime now validates webhook/poll provider configs at boot.
- Start dependencies + tap:
docker compose up --build- Send webhooks:
POST /webhooks/stripePOST /webhooks/githubPOST /webhooks/hubspotPOST /webhooks/linearPOST /webhooks/shopify
Use the guided bootstrap script to get from install to first accepted event with one flow:
./scripts/bootstrap.shWhat it does:
- prompts for provider + webhook secret,
- optionally wires ClickHouse username/password secrets for sink mode,
- generates
values.onboarding.yaml, - creates/updates a Kubernetes Secret with webhook credentials,
- installs/upgrades the Helm release with
--wait, - runs a signed provider-aware webhook smoke test.
Run smoke test only (for an existing install):
./scripts/smoke-onboarding.sh --provider generic --release ensemble-tap --namespace ensemble --secret '<your-secret>'go test ./...
go run ./cmd/tap -config ./config.yaml
go run ./cmd/tap -config ./config.yaml -check-config
./scripts/lint-config.sh
./scripts/assert-chart-render.sh
make ci-localstate.backend=memorykeeps checkpoints/snapshots in memory.state.backend=sqlitepersists poll state tostate.sqlite_path.
nats.urlaccepts a comma-separated endpoint list; each endpoint must usenats|tls|ws|wssand a valid host (and optional valid port).nats.connect_timeout,nats.reconnect_wait,nats.max_reconnects, andnats.publish_timeouttune connection and publish behavior.nats.publish_max_retriesandnats.publish_retry_backofftune publish retry resilience for transient JetStream errors.nats.username/password,nats.token, andnats.creds_fileare mutually exclusive auth modes.nats.secure,nats.insecure_skip_verify,nats.ca_file,nats.cert_file, andnats.key_filetune NATS TLS and optional mTLS.nats.stream_replicas,nats.stream_storage(file|memory), andnats.stream_discard(old|new) tune JetStream durability and pressure behavior.nats.stream_max_consumersandnats.stream_max_msgs_per_subjectcap consumer and per-subject cardinality at stream level (0keeps JetStream defaults/unlimited behavior).nats.stream_compression(none|s2) andnats.stream_allow_msg_ttlcontrol JetStream storage compression and message TTL support.nats.stream_max_msgs,nats.stream_max_bytes, andnats.stream_max_msg_sizeapply stream-level retention and message-size limits.clickhouse.username/clickhouse.password,clickhouse.secure, andclickhouse.insecure_skip_verifytune ClickHouse auth/TLS.clickhouse.tls_server_name,clickhouse.ca_file,clickhouse.cert_file, andclickhouse.key_filesupport ClickHouse TLS verification and optional mTLS.clickhouse.max_open_conns,clickhouse.max_idle_conns, andclickhouse.conn_max_lifetimetune connection pool behavior.clickhouse.consumer_name,clickhouse.consumer_fetch_batch_size,clickhouse.consumer_fetch_max_wait,clickhouse.consumer_ack_wait,clickhouse.consumer_max_ack_pending, andclickhouse.insert_timeouttune sink throughput and ack latency.clickhouse.consumer_max_deliver,clickhouse.consumer_backoff,clickhouse.consumer_max_waiting, andclickhouse.consumer_max_request_max_bytestune pull-consumer redelivery, retry timing, and pull pressure limits.clickhouse.addrentries must be validhost:portvalues with ports in1..65535.- To reduce redelivery churn, keep
clickhouse.consumer_fetch_max_wait < clickhouse.consumer_ack_waitandclickhouse.insert_timeout + clickhouse.flush_interval < clickhouse.consumer_ack_wait. clickhouse.retention_ttlcontrols MergeTree TTL for event-time retention at table level.- ClickHouse sink de-duplicates within each batch and skips IDs already present in ClickHouse before insert; skipped rows are exposed via
tap_clickhouse_dedup_skipped_total. - NATS publish retries and JetStream advisories are exposed via
tap_nats_publish_retries_total{reason},tap_nats_publish_retry_delay_seconds, andtap_jetstream_advisories_total{kind}.
Any string config value can be sourced from Vault by using a vault:// reference:
- Format:
vault://<path>#<key>(defaults to keyvaluewhen#<key>is omitted). - Example:
providers.generic.secret: vault://secret/data/homelab/ensemble-tap/runtime#generic-webhook-secret - Example:
server.admin_token: vault://secret/data/homelab/ensemble-tap/runtime#admin-token
Vault auth config lives under vault.*:
vault.address(orVAULT_ADDR) and optionalvault.namespacevault.auth_method:kubernetes(default) ortokenvault.auth_method=kubernetes: setvault.kubernetes_role, optionalvault.kubernetes_mount_path(defaultkubernetes), and optionalvault.kubernetes_jwt_file(default/var/run/secrets/kubernetes.io/serviceaccount/token)vault.auth_method=token: setvault.token,vault.token_file, orVAULT_TOKEN
In Kubernetes, if you use vault.auth_method=kubernetes, ensure the Pod has a service-account JWT available (for Helm chart: set serviceAccount.automount=true or mount a projected token and set vault.kubernetes_jwt_file accordingly).
When any admin token is configured (server.admin_token, server.admin_token_secondary, server.admin_token_read, server.admin_token_replay, server.admin_token_cancel), these endpoints are available:
POST /admin/replay-dlq?limit=100- Requires header
X-Admin-Tokenwith replay permission (server.admin_token/server.admin_token_secondaryorserver.admin_token_replay). - Supports token rotation with
server.admin_token_secondary(admin_token_secondaryrequiresadmin_token, and both values must differ). - Optional least-privilege role tokens:
server.admin_token_read: read/list/status access.server.admin_token_replay: replay submit access.server.admin_token_cancel: cancel access.
- Optional header
Idempotency-Keyto reuse an existing equivalent replay job instead of creating duplicates (409if reused with differentlimit/dry_runparameters). - Optional/required (configurable) header
X-Admin-Reason:- Required when
server.admin_replay_require_reason=true. - Minimum length enforced by
server.admin_replay_reason_min_length(default12).
- Required when
- Optional header
X-Request-ID(echoed back inX-Request-IDresponse header andrequest_idbody field). - Error responses are JSON (
{"request_id":"...","error":"..."}) for consistent automation and audit correlation. - Admin endpoints are token-bucket rate-limited (
server.admin_rate_limit_per_sec,server.admin_rate_limit_burst) and return429withRetry-After. - Optional guardrails: CIDR allowlist (
server.admin_allowed_cidrs) and client-cert requirement (server.admin_mtls_required,server.admin_mtls_client_cert_header). limitmust be a positive integer.dry_run=truecomputes replayable count without consuming DLQ entries.- Replay is capped by
server.admin_replay_max_limit(default2000, valid range1..100000); accepted response includes replay job metadata (job_id,status,effective_limit,max_limit,capped,dry_run). - Replay job metadata retention/capacity is configurable (
server.admin_replay_job_ttl,server.admin_replay_job_max_jobs), and backend is configurable (server.admin_replay_store_backend=memory|sqlite,server.admin_replay_sqlite_path). - Replay execution is configurable (
server.admin_replay_job_timeout,server.admin_replay_max_concurrent_jobs) for bounded runtime and concurrency. - Queue fan-out safety rails are configurable (
server.admin_replay_max_queued_per_ip,server.admin_replay_max_queued_per_token) and return409when exceeded.
- Requires header
GET /admin/replay-dlq- Requires header
X-Admin-Tokenwith read permission (admin_token/admin_token_secondary/admin_token_read/admin_token_replay/admin_token_cancel). - Optional query params:
status(queued|running|succeeded|failed|cancelled),limit(max500, default50), andcursor(from priornext_cursor). - Returns replay job list plus per-status summary counts and pagination cursors for queue introspection.
- Requires header
GET /admin/replay-dlq/{job_id}- Requires header
X-Admin-Tokenwith read permission. - Returns current replay job state (
queued,running,succeeded,failed,cancelled) and result fields (replayed,error,operator_reason,cancel_reason).
- Requires header
DELETE /admin/replay-dlq/{job_id}- Requires header
X-Admin-Tokenwith cancel permission (server.admin_token/server.admin_token_secondaryorserver.admin_token_cancel). - Optional/required (configurable) header
X-Admin-Reason(same requirement rules as replay endpoint). - Cancels replay jobs that are still
queued; returns409if the job is alreadyrunningor completed.
- Requires header
GET /admin/poller-status- Requires header
X-Admin-Tokenwith read permission. - Supports token rotation with
server.admin_token_secondary. - Optional header
X-Request-ID(echoed back inX-Request-IDresponse header andrequest_idbody field). - Error responses are JSON (
{"request_id":"...","error":"..."}). - Optional filters:
provider(case-insensitive),tenant. - Response includes
countand per-poller runtime fields (interval, rate limiter values, failure budget, circuit-break duration, jitter ratio, last run/success/error details). - Structured audit logs are emitted for authorized and unauthorized admin calls (
request_id, requester IP, user-agent, path/method, and duration). - Prometheus metrics include
tap_admin_requests_total{endpoint,outcome}andtap_admin_request_duration_seconds{endpoint,outcome}. - Replay lifecycle metrics include
tap_admin_replay_jobs_total{stage}andtap_admin_replay_jobs_in_flight. - Poller health metrics include
tap_poller_stuck{provider,tenant}andtap_poller_consecutive_failures{provider,tenant}.
- Requires header
- OpenAPI contract:
docs/admin-openapi.yaml
# Replay DLQ with explicit request id
curl -i -X POST 'http://localhost:8080/admin/replay-dlq?limit=50' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Admin-Reason: replay after incident #1234' \
-H 'Idempotency-Key: replay-tenant-a-20260304' \
-H 'X-Request-ID: replay-manual-001'
# Replay dry-run
curl -i -X POST 'http://localhost:8080/admin/replay-dlq?limit=200&dry_run=true' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Admin-Reason: estimate replay volume before run' \
-H 'X-Request-ID: replay-dry-run-001'
# Replay job list (latest succeeded jobs)
curl -i 'http://localhost:8080/admin/replay-dlq?status=succeeded&limit=20' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Request-ID: replay-list-001'
# Replay job list next page
curl -i 'http://localhost:8080/admin/replay-dlq?status=succeeded&limit=20&cursor=<next_cursor_from_previous_response>' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Request-ID: replay-list-002'
# Replay job status
curl -i 'http://localhost:8080/admin/replay-dlq/replay_1234567890_1' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Request-ID: replay-status-001'
# Cancel queued replay job
curl -i -X DELETE 'http://localhost:8080/admin/replay-dlq/replay_1234567890_1' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Admin-Reason: duplicate replay request queued' \
-H 'X-Request-ID: replay-cancel-001'
# Poller status (filtered)
curl -i 'http://localhost:8080/admin/poller-status?provider=hubspot&tenant=tenant-a' \
-H 'X-Admin-Token: your-admin-token' \
-H 'X-Request-ID: status-manual-001'
# Example error payload (401/429/400/500):
# {"request_id":"status-manual-001","error":"rate limit exceeded"}Install with Helm:
helm upgrade --install ensemble-tap ./charts/ensemble-tap \
--namespace ensemble \
--create-namespaceNotes:
- Chart default keeps ClickHouse sink disabled (
config.clickhouse.addr: "") for simpler initial onboarding. - NetworkPolicy automatically allows NATS/ClickHouse TCP egress ports derived from chart config (
networkPolicy.allowConfigPorts=true). - Image pinning by digest is supported via
image.digest(usesrepository@sha256:...).
See chart-specific usage in charts/ensemble-tap/README.md.
- CI workflow validates unit tests, static analysis (
staticcheck), OpenAPI contract/runtime parity, Docker build, Helm chart render/lint, and real NATS+ClickHouse integration. - CI security gates run
gosec,govulncheck, Trivy CRITICAL scan, and source SBOM generation. - CI performance smoke gate runs a lightweight
k6probe against/livezand/readyz. - CI runs nightly (
09:17 UTC) with tool caches disabled to catch toolchain/cache drift early. - CI persists flaky-job trend artifacts (
ci-flake-metrics-*) withintegration+perf-smokestatus and duration history. - Failed
mainCI runs automatically open an issue with failed job links and log snippets for triage. - Run
Release Candidate Smoke(.github/workflows/release-candidate-smoke.yml) before creating av*tag; it deploys via Helm onkind, verifies/readyz, and validates signed webhook publish end-to-end. - Tag pushes matching
v*trigger release workflow to publish multi-arch images to GHCR, generate source/image SBOMs, sign image digests with cosign keyless OIDC, and package the Helm chart. - Branch protection can be applied with
.github/scripts/apply_branch_protection.shor via the manualBranch Protectionworkflow.