From 81031dad8a83ffb6ac70dbd88e959f0bab346ede Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 19 Apr 2026 23:35:17 -0400 Subject: [PATCH 1/2] test(a2a): pin comply_test_controller result.artifacts wire contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #211. The SDK's A2A server already emits result.artifacts correctly across the success, list_scenarios, ControllerError, and unknown-scenario paths — this test drives the full A2A Starlette app through an ASGI transport and asserts the JSON-RPC wire shape so any future regression fires here before it surfaces in external storyboard validators. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/conformance/a2a/__init__.py | 0 .../test_comply_test_controller_artifacts.py | 163 ++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 tests/conformance/a2a/__init__.py create mode 100644 tests/conformance/a2a/test_comply_test_controller_artifacts.py diff --git a/tests/conformance/a2a/__init__.py b/tests/conformance/a2a/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conformance/a2a/test_comply_test_controller_artifacts.py b/tests/conformance/a2a/test_comply_test_controller_artifacts.py new file mode 100644 index 00000000..ff90650e --- /dev/null +++ b/tests/conformance/a2a/test_comply_test_controller_artifacts.py @@ -0,0 +1,163 @@ +"""Wire-level conformance test for A2A Task.artifacts population. + +Drives the full A2A Starlette app through an ASGI transport, sends a raw +JSON-RPC `message/send` for each `comply_test_controller` scenario shape, and +asserts the returned JSON has a populated ``result.artifacts`` list with the +tool payload in a ``DataPart``. + +Guards issue #211: an external storyboard validator reported "A2A response +missing result.artifacts field" on seller runs. The SDK path verified here is +the one that backs ``scripts/skill-run.sh seller ... media_buy_seller`` — if +this test ever goes red, the same warning will fire on those runs. +""" + +from __future__ import annotations + +import sys +from typing import Any + +import httpx +import pytest + +from adcp.server import ADCPHandler +from adcp.server.a2a_server import create_a2a_server +from adcp.server.test_controller import TestControllerError, TestControllerStore + +pytestmark = pytest.mark.skipif( + sys.version_info < (3, 11), + reason="a2a-sdk starlette integration requires Python 3.11+", +) + + +class _MinimalSeller(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> dict[str, Any]: + return {"adcp": {"major_versions": [3]}} + + +class _Store(TestControllerStore): + async def force_account_status(self, account_id: str, status: str) -> dict[str, Any]: + if account_id == "missing": + raise TestControllerError("NOT_FOUND", f"Account {account_id} not found") + return {"previous_state": "active", "current_state": status} + + +async def _send(client: httpx.AsyncClient, scenario_payload: dict[str, Any]) -> dict[str, Any]: + """POST a JSON-RPC ``message/send`` for comply_test_controller.""" + body = { + "jsonrpc": "2.0", + "id": "1", + "method": "message/send", + "params": { + "message": { + "messageId": "m1", + "role": "user", + "parts": [ + { + "kind": "data", + "data": { + "skill": "comply_test_controller", + "parameters": scenario_payload, + }, + } + ], + } + }, + } + resp = await client.post("/", json=body) + assert resp.status_code == 200, resp.text + return resp.json() + + +def _assert_artifact_carries(body: dict[str, Any], expected_keys: set[str]) -> dict[str, Any]: + """Verify ``result.artifacts[].parts[].data`` carries the tool payload.""" + assert "result" in body, f"JSON-RPC response missing result: {body}" + result = body["result"] + + artifacts = result.get("artifacts") + assert artifacts, ( + "A2A Task missing result.artifacts — storyboard validators that " + "strictly require Task.artifacts will reject this response. " + f"Full result: {result}" + ) + assert isinstance(artifacts, list) and len(artifacts) >= 1 + + parts = artifacts[-1].get("parts") or [] + data_parts = [p for p in parts if p.get("kind") == "data"] + assert data_parts, f"Artifact missing DataPart (kind='data'): {parts}" + + data = data_parts[-1].get("data") + assert isinstance(data, dict), f"DataPart.data must be an object: {data!r}" + assert expected_keys.issubset( + data.keys() + ), f"DataPart.data missing expected keys {expected_keys - data.keys()}: {data}" + return data + + +async def _bootstrap_client() -> tuple[httpx.AsyncClient, Any]: + app = create_a2a_server(_MinimalSeller(), name="conformance-seller", test_controller=_Store()) + transport = httpx.ASGITransport(app=app) + client = httpx.AsyncClient(transport=transport, base_url="http://test") + return client, app + + +async def test_state_transition_populates_artifacts() -> None: + """force_* scenario → StateTransitionSuccess shape under result.artifacts.""" + client, _ = await _bootstrap_client() + async with client: + body = await _send( + client, + { + "scenario": "force_account_status", + "params": {"account_id": "a1", "status": "suspended"}, + }, + ) + data = _assert_artifact_carries(body, {"success", "previous_state", "current_state"}) + assert data["success"] is True + assert data["current_state"] == "suspended" + assert body["result"]["status"]["state"] == "completed" + + +async def test_list_scenarios_populates_artifacts() -> None: + """list_scenarios → ListScenariosSuccess shape under result.artifacts.""" + client, _ = await _bootstrap_client() + async with client: + body = await _send(client, {"scenario": "list_scenarios"}) + data = _assert_artifact_carries(body, {"success", "scenarios"}) + assert data["success"] is True + assert "force_account_status" in data["scenarios"] + + +async def test_controller_error_populates_artifacts() -> None: + """TestControllerError → ControllerError shape still lands in artifacts. + + The AdCP comply_test_controller contract treats application errors + (NOT_FOUND, INVALID_TRANSITION, UNKNOWN_SCENARIO, ...) as successful + Tasks whose DataPart carries ``success: false``. The Task state stays + ``completed`` — the error lives in the payload, not the transport. + """ + client, _ = await _bootstrap_client() + async with client: + body = await _send( + client, + { + "scenario": "force_account_status", + "params": {"account_id": "missing", "status": "suspended"}, + }, + ) + data = _assert_artifact_carries(body, {"success", "error"}) + assert data["success"] is False + assert data["error"] == "NOT_FOUND" + assert body["result"]["status"]["state"] == "completed" + + +async def test_unknown_scenario_populates_artifacts() -> None: + """Unsupported scenario → ControllerError(UNKNOWN_SCENARIO) in artifacts.""" + client, _ = await _bootstrap_client() + async with client: + body = await _send( + client, + {"scenario": "simulate_delivery", "params": {"media_buy_id": "x"}}, + ) + data = _assert_artifact_carries(body, {"success", "error"}) + assert data["success"] is False + assert data["error"] == "UNKNOWN_SCENARIO" From 3bf663da780ee4db3d08eaf04ca68ee37dccd281 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 19 Apr 2026 23:35:53 -0400 Subject: [PATCH 2/2] feat(webhooks): adcp.webhooks.deliver() one-shot legacy-auth dispatcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #212. Collapses the seller's six-step boilerplate (build envelope, serialize, sign, merge headers, POST, echo token) into one call so the signer and the wire see the *same bytes* — the serialization-format drift PR #205 fixed in the hand-rolled path is structurally impossible here. Covers the legacy AdCP 3.x authentication schemes (Bearer, HMAC-SHA256) and emits a one-shot DeprecationWarning pointing migrators at WebhookSender for RFC 9421. Missing or unknown authentication raises with a message that names the fix (use WebhookSender), not a silent unsigned POST. Token-echo is opt-in via ``token_field=`` — the AdCP spec says the token is "echoed back in the payload" but doesn't name the field, so the caller picks one the receiver agrees to read. Defense-in-depth at the helper boundary: * HTTPS-only URL; rejects embedded userinfo (getting logged by every HTTP intermediary is a footgun). * CRLF / NUL rejection on credentials + extra_headers (belt-and-braces over httpx's own header validation). * Reserved-header blocklist covers Authorization, Content-*, Host, Signature, Signature-Input, X-AdCP-*; each class gets a fix-hint tailored to the likely mistake. * 10MB body-size cap (shared with WebhookSender.send_raw for parity). * 64-entry extra_headers cap. * authentication must be a Mapping; schemes must be a list. Tests (22 for deliver + 1 for WebhookSender parity) cover: Bearer/HMAC happy paths, byte-identical signing-vs-wire invariant, retry byte-identity, token-echo opt-in shape (MCP top-level vs Task metadata), default-off echo, deprecation warning, and every boundary-validation failure mode. SKILL.md "Emitting Webhooks" section shows both the 4.0 default (WebhookSender) and the 3.x legacy (deliver) paths side-by-side with production notes (shared httpx.AsyncClient, egress transport, token_field coordination). Four expert reviews (code, protocol, DX, security) across three rounds. Deferred as follow-up: IP-pinned egress transport factory; upstream AdCP issue for 9421-vs-legacy precedence when both are on one config. Co-Authored-By: Claude Opus 4.7 (1M context) --- skills/build-seller-agent/SKILL.md | 42 ++ src/adcp/webhook_sender.py | 11 + src/adcp/webhooks.py | 406 ++++++++++++++- .../signing/test_webhook_sender_e2e.py | 13 + tests/test_webhooks_deliver.py | 479 ++++++++++++++++++ 5 files changed, 948 insertions(+), 3 deletions(-) create mode 100644 tests/test_webhooks_deliver.py diff --git a/skills/build-seller-agent/SKILL.md b/skills/build-seller-agent/SKILL.md index ab0c63a0..23376069 100644 --- a/skills/build-seller-agent/SKILL.md +++ b/skills/build-seller-agent/SKILL.md @@ -481,6 +481,48 @@ Declare `compliance_testing` in supported_protocols: return capabilities_response(["media_buy", "compliance_testing"]) ``` +## Emitting Webhooks + +When a long-running operation finishes (or progresses), POST a webhook to the buyer's `push_notification_config` / `reporting_webhook`. The SDK gives you two helpers; pick based on the buyer's authentication profile. + +**AdCP 4.0 default — RFC 9421 signing** (use `WebhookSender`): +```python +from adcp.webhooks import WebhookSender, create_mcp_webhook_payload + +sender = WebhookSender.from_jwk(webhook_signing_jwk_with_private_d) +async with sender: + result = await sender.send_mcp( + url=str(config.url), + task_id=task_id, + task_type="create_media_buy", + status="completed", + result=response_dict, + ) + if not result.ok: + retry = await sender.resend(result) # byte-identical replay +``` + +**AdCP 3.x legacy — Bearer or HMAC-SHA256** (use `deliver`): +```python +from adcp.webhooks import deliver, create_mcp_webhook_payload + +response = await deliver( + config, # PushNotificationConfig or ReportingWebhook from the request + create_mcp_webhook_payload( + task_id=task_id, task_type="create_media_buy", + status="completed", result=response_dict, + ), +) +response.raise_for_status() +``` + +Notes: +- `deliver` hashes/signs the exact bytes it POSTs for HMAC-SHA256; for Bearer it attaches the credential as `Authorization`. Either way, the signer and the wire cannot disagree. +- `deliver` emits a `DeprecationWarning` on first use; migrate to `WebhookSender` for 4.0. +- If your buyer relies on `config.token` echo, pass `token_field="push_token"` (pick a name you and the receiver agree on — there is no spec-defined field name). +- In production pass a shared `httpx.AsyncClient` to `deliver` (or `client=` to `WebhookSender`) with a transport that blocks private/link-local IPs — the helper validates URL scheme but not egress destination. +- Retries: call `deliver` again with the same payload (deterministic serialization). For byte-identical HTTP envelopes including headers, use `WebhookSender.resend()`. + ## SDK Quick Reference **Response builders** (from `adcp.server.responses`): diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index 1c46d9dd..725f4b62 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -49,6 +49,11 @@ # re-enter send_*() with the same idempotency_key — the body is re-signed # but dedup still fires at the receiver. _DEFAULT_TIMEOUT_SECONDS = 10.0 +# 10MB serialized-body cap — matches adcp.webhooks.deliver and typical +# buyer-side reverse-proxy limits. Guards against OOM when a caller passes +# an adversarial payload: json.dumps holds dict + str concurrently, and +# .encode() transiently triples memory, so a 1GB body is multiple GB RSS. +_MAX_BODY_BYTES = 10 * 1024 * 1024 @dataclass(frozen=True) @@ -340,6 +345,12 @@ async def send_raw( # gets signed AND posted. Do not allow an httpx `json=` path anywhere # in the stack because it would reserialize and break the digest. body = json.dumps(body_dict).encode("utf-8") + if len(body) > _MAX_BODY_BYTES: + raise ValueError( + f"serialized webhook body is {len(body):,} bytes, over the " + f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks " + "or use batch-reporting endpoints." + ) return await self._send_bytes( url=url, body=body, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index 0ba8c061..9a7042d2 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -1,10 +1,22 @@ """Webhook creation, signing, and reception for AdCP agents. -Single front door for both senders (``create_mcp_webhook_payload``, -``sign_webhook``) and receivers (``WebhookReceiver``). Underlying modules in +Single front door for both senders and receivers. Underlying modules in ``adcp.signing.webhook_*`` and ``adcp.webhook_receiver`` are implementation details kept for internal organization — prefer the re-exports here for stability. + +**Which sender helper to use** + +* :func:`deliver` — one-shot dispatch for legacy ``authentication`` (Bearer + or HMAC-SHA256). Collapses the sender's 6-step boilerplate into one call + and signs the exact bytes it POSTs. Deprecated with AdCP 4.0; emits a + :class:`DeprecationWarning`. +* :class:`WebhookSender` — the AdCP 4.0 default. RFC 9421 signing, shared + connection pool, byte-identical replay via :meth:`WebhookSender.resend`. + Use this for any new integration. +* :func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload` + plus :func:`get_adcp_signed_headers_for_webhook` — low-level path for + callers who need full control over serialization, headers, or retry logic. """ from __future__ import annotations @@ -12,10 +24,15 @@ import hashlib import hmac import json +import time import uuid +import warnings +from collections.abc import Mapping from datetime import datetime, timezone from typing import Any, cast +from urllib.parse import urlsplit +import httpx from a2a.types import ( Artifact, DataPart, @@ -559,6 +576,388 @@ def create_a2a_webhook_payload( ) +_AUTH_DEPRECATION_WARNED = False +_RESERVED_HEADERS = frozenset( + { + "authorization", + "content-digest", + "content-length", + "content-type", + "host", + "signature", + "signature-input", + "x-adcp-signature", + "x-adcp-timestamp", + } +) +_HEADER_FORBIDDEN_CHARS = ("\r", "\n", "\x00") +_MAX_HEADER_VALUE_BYTES = 8192 +_DEFAULT_TIMEOUT_SECONDS = 10.0 +# 10MB cap matches typical buyer-side reverse-proxy limits and is ~100× +# the realistic AdCP payload (biggest seen: get_products with long product +# lists, rarely over 100KB). Serialized bytes, not dict size — post- +# serialization check avoids a pre-cap on dict size being meaningless. +_MAX_BODY_BYTES = 10 * 1024 * 1024 +# Cap extra_headers count so a caller that iterates a large container +# into the kwarg can't produce an unbounded header block. +_MAX_EXTRA_HEADERS = 64 + + +def _warn_auth_deprecation_once() -> None: + global _AUTH_DEPRECATION_WARNED + if _AUTH_DEPRECATION_WARNED: + return + _AUTH_DEPRECATION_WARNED = True + warnings.warn( + "PushNotificationConfig.authentication (Bearer, HMAC-SHA256) is " + "deprecated in AdCP 4.0. Migrate senders to adcp.webhooks.WebhookSender " + "(RFC 9421 signing) and receivers to the 9421 webhook profile. This " + "warning fires once per process.", + DeprecationWarning, + stacklevel=3, + ) + + +async def deliver( + config: AdCPBaseModel | Mapping[str, Any], + payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any], + *, + client: httpx.AsyncClient | None = None, + extra_headers: Mapping[str, str] | None = None, + timeout_seconds: float | None = None, + token_field: str | None = None, +) -> httpx.Response: + """Dispatch one legacy-auth webhook in a single call. + + Collapses the sender's six-step boilerplate (build envelope, serialize, + sign, merge headers, POST, echo token) into one call so the signer and + the wire see the *same bytes*. The serialization-format drift that + plagued the hand-rolled path — ``json=`` in httpx re-serializes the dict + and breaks ``Content-Digest`` — is structurally impossible here: the + helper JSON-serializes once, signs those bytes, and POSTs those bytes + via ``content=``. + + This helper is for the **legacy** AdCP 3.x authentication schemes + (``Bearer`` / ``HMAC-SHA256``) and emits a :class:`DeprecationWarning` + on first use. For 4.0+ integrations use :class:`WebhookSender` (RFC 9421). + + Args: + config: A :class:`PushNotificationConfig`, :class:`ReportingWebhook`, + or equivalent dict. Must carry ``url`` (``https://`` only) and + ``authentication.{schemes, credentials}``. + payload: The webhook body. Accepts a Pydantic model (e.g. built via + :func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload`), + an a2a ``Task`` / ``TaskStatusUpdateEvent``, or a plain dict. + Models are dumped with ``mode="json", exclude_none=True``. + client: Optional shared ``httpx.AsyncClient``. Recommended in + production for connection pooling and egress-policy enforcement + (a custom ``httpx.BaseTransport`` is the right place to block + SSRF to private IPs — the helper validates scheme but cannot + see post-DNS resolution without racing TOCTOU). + extra_headers: Merged last. May not override any of + ``Content-Type``, ``Content-Digest``, ``Content-Length``, + ``Host``, ``Authorization``, ``Signature``, ``Signature-Input``, + ``X-AdCP-Signature``, or ``X-AdCP-Timestamp``. Auth and + signature-binding headers are sender-owned so the signer and + the wire cannot disagree. + timeout_seconds: Per-request timeout applied only when the helper + creates its own client. Raises ``ValueError`` if set alongside + ``client=`` — configure the timeout on the shared client instead. + token_field: Opt-in field name for echoing ``config.token`` into + the payload body (top-level for MCP dicts, under ``metadata`` + for ``Task`` / ``TaskStatusUpdateEvent``). Default ``None`` + disables echo; there is no spec-defined field name, so the + caller must pick one the receiver agrees to read. + + Returns: + The raw ``httpx.Response``. Caller is responsible for + ``response.status_code`` inspection and retry scheduling. For retry, + pass the *same, unmutated* payload again — serialization is + deterministic so retries produce byte-identical bodies (spec-correct + receiver dedup via ``idempotency_key``). Mutating the payload dict + between attempts breaks byte-identity; callers who need byte-identical + HTTP envelopes across retries (including headers) should use + :class:`WebhookSender` and :meth:`WebhookSender.resend`. There is + intentionally no ``resend()`` here — the retry contract is "call + ``deliver`` again with the same inputs". + + Raises: + ValueError: missing ``url``, non-HTTPS URL, control characters in + header values, missing / unknown ``authentication`` (use + :class:`WebhookSender` for RFC 9421), overriding a reserved + header, or setting ``timeout_seconds`` alongside ``client``. + DeprecationWarning (fires once): ``authentication`` is a 3.x fallback. + + Security notes: + * ``config.url`` is buyer-controlled. The helper enforces HTTPS and + rejects control characters but does NOT block private / link-local + destinations — wire an egress policy via ``client.transport`` to + stop SSRF into your VPC or cloud metadata service. + * ``config.token`` sits in the request body, so any receiver that + logs bodies retains it indefinitely. Treat the token as a + medium-sensitivity correlator, not a long-lived secret. + * At ``httpx`` DEBUG log level, ``Authorization`` and + ``X-AdCP-Signature`` appear in logs — gate DEBUG in production. + """ + if client is not None and timeout_seconds is not None: + raise ValueError( + "timeout_seconds cannot be set when client= is provided; " + "configure the timeout on your shared httpx.AsyncClient instead." + ) + + url, token, auth_scheme, credentials = _extract_config_fields(config) + + if auth_scheme is None: + raise ValueError( + "config.authentication is required for deliver(). " + "For RFC 9421 signing (the AdCP 4.0 default), use " + "adcp.webhooks.WebhookSender — no helper for unsigned webhooks " + "is provided because the spec requires signing." + ) + if auth_scheme not in ("Bearer", "HMAC-SHA256"): + raise ValueError( + f"unknown authentication scheme {auth_scheme!r}; " + "supported legacy schemes are 'Bearer' and 'HMAC-SHA256'. " + "For RFC 9421 use adcp.webhooks.WebhookSender." + ) + + _warn_auth_deprecation_once() + + body_dict = _payload_to_dict(payload) + if token is not None and token_field is not None: + _validate_header_value("config.token", token) + _inject_push_token(body_dict, token, payload, token_field) + + body_bytes = json.dumps(body_dict).encode("utf-8") + if len(body_bytes) > _MAX_BODY_BYTES: + raise ValueError( + f"serialized webhook body is {len(body_bytes):,} bytes, over the " + f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks or use " + "the batch-reporting endpoints — most receivers reject bodies over " + "10MB at the reverse proxy anyway." + ) + + headers: dict[str, str] = {"Content-Type": "application/json"} + + if auth_scheme == "Bearer": + if not credentials: + raise ValueError( + "config.authentication.schemes=['Bearer'] requires " + "authentication.credentials (min 32 characters — token " + "exchanged out-of-band with the receiver)." + ) + _validate_header_value("authentication.credentials", credentials) + headers["Authorization"] = f"Bearer {credentials}" + else: # HMAC-SHA256 + if not credentials: + raise ValueError( + "config.authentication.schemes=['HMAC-SHA256'] requires " + "authentication.credentials (min 32 characters — shared " + "secret exchanged out-of-band with the receiver)." + ) + _validate_header_value("authentication.credentials", credentials) + get_adcp_signed_headers_for_webhook( + headers, + secret=credentials, + timestamp=str(int(time.time())), + payload=body_dict, + ) + + if extra_headers: + if len(extra_headers) > _MAX_EXTRA_HEADERS: + raise ValueError( + f"extra_headers has {len(extra_headers)} entries; " + f"helper caps at {_MAX_EXTRA_HEADERS}. Pass only the custom " + "headers you actually need (trace IDs, correlation IDs)." + ) + for key in extra_headers: + normalized = str(key).lower() + if normalized in _RESERVED_HEADERS or normalized.startswith(":"): + raise ValueError(_reserved_header_message(normalized, key)) + for key, value in extra_headers.items(): + _validate_header_value(f"extra_headers[{key!r}]", value) + headers[key] = value + + owns_client = client is None + effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS + http_client = client or httpx.AsyncClient(timeout=effective_timeout) + try: + return await http_client.post(url, content=body_bytes, headers=headers) + finally: + if owns_client: + await http_client.aclose() + + +def _extract_config_fields( + config: AdCPBaseModel | Mapping[str, Any], +) -> tuple[str, str | None, str | None, str | None]: + """Pull ``url``, ``token``, auth scheme, and credentials out of a webhook config. + + Accepts either a ``PushNotificationConfig`` / ``ReportingWebhook`` model + or an equivalent dict — sellers often receive these as plain dicts from + an incoming AdCP request and shouldn't have to round-trip through the + Pydantic model just to dispatch a webhook. + + Validates the URL at the boundary: HTTPS only, no control characters. + """ + if hasattr(config, "model_dump"): + cfg = cast(AdCPBaseModel, config).model_dump(mode="json", exclude_none=True) + else: + cfg = dict(config) + + url_value = cfg.get("url") + if not url_value: + raise ValueError( + "webhook config is missing required 'url' field. Pass a " + "PushNotificationConfig, ReportingWebhook, or dict with an " + "https:// 'url'." + ) + url = str(url_value) + if any(c in url for c in _HEADER_FORBIDDEN_CHARS): + raise ValueError( + "webhook config 'url' contains control characters " + "(newline, carriage return, or NUL are not allowed in URLs)" + ) + lower = url.lower() + if not lower.startswith("https://"): + scheme_end = lower.find("://") + shown_scheme = lower[:scheme_end] if scheme_end >= 0 else "" + raise ValueError( + f"webhook config 'url' must use https:// (got scheme {shown_scheme!r}). " + "HTTP and other schemes are rejected because they expose the " + "webhook body, token, and Authorization header in transit." + ) + parsed = urlsplit(url) + if parsed.username is not None or parsed.password is not None: + raise ValueError( + "webhook config 'url' must not embed userinfo (user:pass@host). " + "Pass credentials via config.authentication.credentials instead — " + "URLs get logged by proxies, load balancers, and httpx DEBUG." + ) + + token = cfg.get("token") + + auth_raw = cfg.get("authentication") + if auth_raw is not None and not isinstance(auth_raw, Mapping): + raise ValueError( + f"config.authentication must be an object with 'schemes' + " + f"'credentials', got {type(auth_raw).__name__}" + ) + auth: Mapping[str, Any] = auth_raw or {} + schemes_raw = auth.get("schemes") + if schemes_raw is not None and not isinstance(schemes_raw, (list, tuple)): + raise ValueError( + "config.authentication.schemes must be a list, got " f"{type(schemes_raw).__name__}" + ) + schemes = list(schemes_raw or []) + if len(schemes) > 1: + raise ValueError( + f"config.authentication.schemes has {len(schemes)} entries; " + "the AdCP legacy auth schema allows exactly one scheme per config." + ) + scheme = schemes[0] if schemes else None + credentials = auth.get("credentials") + + return url, token, scheme, credentials + + +def _reserved_header_message(normalized: str, original_key: Any) -> str: + """Build a fix-the-error message tailored to the reserved header class. + + The mistake category differs sharply by header: a caller passing + ``Authorization`` usually doesn't know about ``config.authentication``; + a caller passing ``Content-Type`` is probably debugging and reached for + the override by reflex. Give each the right nudge.""" + if normalized == "authorization": + return ( + f"extra_headers may not override {original_key!r} — set " + "config.authentication.schemes=['Bearer'] + credentials instead. " + "The helper derives Authorization from config so the signer and " + "the wire cannot disagree." + ) + if normalized in ("signature", "signature-input", "content-digest"): + return ( + f"extra_headers may not override {original_key!r} — RFC 9421 " + "signing headers are produced by adcp.webhooks.WebhookSender, " + "not injected. Switch helpers if you need 9421." + ) + if normalized in ("x-adcp-signature", "x-adcp-timestamp"): + return ( + f"extra_headers may not override {original_key!r} — these are " + "the HMAC-SHA256 signature headers the helper produces from " + "config.authentication.credentials." + ) + if normalized == "content-type": + return ( + f"extra_headers may not override {original_key!r}; " + "the helper always sends 'application/json'." + ) + return ( + f"extra_headers may not override {original_key!r}; " + "this header is sender-owned and managed by the helper." + ) + + +def _payload_to_dict( + payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any], +) -> dict[str, Any]: + """Normalize a webhook payload to a JSON-ready dict. + + a2a-sdk ``Task`` / ``TaskStatusUpdateEvent`` serialize with ``by_alias=True`` + so ``artifact_id`` → ``artifactId`` matches what external A2A receivers + expect. MCP-shape dicts / AdCP models are dumped with camelCase-off defaults. + """ + if isinstance(payload, (Task, TaskStatusUpdateEvent)): + return payload.model_dump(mode="json", by_alias=True, exclude_none=True) + if hasattr(payload, "model_dump"): + model = cast(AdCPBaseModel, payload) + return model.model_dump(mode="json", exclude_none=True) + return dict(payload) + + +def _inject_push_token( + body: dict[str, Any], + token: str, + original_payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any], + token_field: str, +) -> None: + """Echo ``PushNotificationConfig.token`` into the body for buyer-side auth. + + AdCP 3.x says the token is "echoed back in webhook payload" but doesn't + name the field. The caller picks ``token_field`` to match whatever the + receiver is configured to read. A2A ``Task`` / ``TaskStatusUpdateEvent`` + carry a ``metadata`` object — the token lands there so the top-level + shape stays a valid A2A entity. MCP-shape webhooks and plain dicts get + the token at top-level (``additionalProperties`` is permitted by the + MCP webhook payload schema). + """ + is_a2a = isinstance(original_payload, (Task, TaskStatusUpdateEvent)) + if is_a2a: + metadata = body.get("metadata") + if not isinstance(metadata, dict): + metadata = {} + body["metadata"] = metadata + metadata.setdefault(token_field, token) + else: + body.setdefault(token_field, token) + + +def _validate_header_value(name: str, value: Any) -> None: + """Reject control characters and oversize values at the helper boundary. + + httpx rejects bare CRLF at send time, but relying on that is + defense-in-absentia — a later swap of the HTTP client, or a caller that + logs the value before sending, would re-open header injection. Enforce + here so the boundary contract is explicit. + """ + if not isinstance(value, str): + raise ValueError(f"{name} must be a string, got {type(value).__name__}") + if any(c in value for c in _HEADER_FORBIDDEN_CHARS): + raise ValueError(f"{name} contains control characters") + if len(value.encode("utf-8")) > _MAX_HEADER_VALUE_BYTES: + raise ValueError(f"{name} exceeds {_MAX_HEADER_VALUE_BYTES}-byte limit") + + # Sender import is at the bottom to resolve a circular dependency: # WebhookSender uses create_mcp_webhook_payload / generate_webhook_idempotency_key # which are defined above. Importing it at the top would try to resolve those @@ -577,7 +976,8 @@ def create_a2a_webhook_payload( "get_adcp_signed_headers_for_webhook", # Sender — 9421 signing (low-level) "sign_webhook", - # Sender — one-call outbound helper + # Sender — one-call outbound helpers + "deliver", "WebhookDeliveryResult", "WebhookSender", # Receiver — 9421 verification (low-level) diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index 9f797c60..fd90baa2 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -197,6 +197,19 @@ async def test_send_raw_requires_idempotency_key_at_signature() -> None: ) +@pytest.mark.asyncio +async def test_send_raw_enforces_body_size_cap() -> None: + """Oversized bodies raise before signing — matches adcp.webhooks.deliver.""" + app, _ = _build_receiver_app() + async with _build_sender(app) as sender: + with pytest.raises(ValueError, match="10,485,760"): + await sender.send_raw( + url="http://test/webhooks/adcp", + idempotency_key="whk_cap_test_0000000000000000", + payload={"blob": "x" * (11 * 1024 * 1024)}, + ) + + @pytest.mark.asyncio async def test_from_jwk_rejects_wrong_adcp_use() -> None: """Guardrail at construction: a request-signing JWK silently produces diff --git a/tests/test_webhooks_deliver.py b/tests/test_webhooks_deliver.py new file mode 100644 index 00000000..a21b9634 --- /dev/null +++ b/tests/test_webhooks_deliver.py @@ -0,0 +1,479 @@ +"""Tests for adcp.webhooks.deliver() — one-shot legacy-auth webhook dispatch. + +The helper collapses the seller's six-step boilerplate (build, serialize, +sign, merge headers, POST, token-echo) into one call. The load-bearing +property every test defends is that the bytes we *sign* and the bytes we +*POST* are identical — the serialization-format drift that plagued the +hand-rolled path is structurally prevented here. + +All tests suppress the ``DeprecationWarning`` that fires on first legacy- +auth use — it's asserted separately in ``test_deprecation_warning_fires``. +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +from typing import Any + +import httpx +import pytest +from a2a.types import Artifact, DataPart, Part, Task, TaskState, TaskStatus + +from adcp.types.generated_poc.core.push_notification_config import ( + Authentication as PNAuthentication, +) +from adcp.types.generated_poc.core.push_notification_config import ( + PushNotificationConfig, +) +from adcp.types.generated_poc.core.reporting_webhook import Authentication as RWAuth +from adcp.types.generated_poc.core.reporting_webhook import ( + ReportingFrequency, + ReportingWebhook, +) +from adcp.webhooks import ( + create_mcp_webhook_payload, + deliver, +) + +# Global DeprecationWarning filter — legacy auth always warns; silence here +# and assert the warning once in its own dedicated test. The filter strips +# the module qualifier so it catches warnings emitted via stacklevel that +# appear to originate from test-file frames. +pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") + +# Shared secret padded past the 32-char floor the auth schema enforces. +_SECRET = "s" * 40 +_BEARER_TOKEN = "b" * 40 +# PushNotificationConfig.token has min_length=16. +_PUSH_TOKEN = "push-token-01234567" +_TOKEN_FIELD = "push_notification_token" + + +async def _capture_client( + handler: Any = None, +) -> tuple[httpx.AsyncClient, list[httpx.Request]]: + """Build an AsyncClient backed by a MockTransport; capture every request.""" + captured: list[httpx.Request] = [] + + def _default_handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(202, json={"ok": True}) + + transport = httpx.MockTransport(handler or _default_handler) + return httpx.AsyncClient(transport=transport), captured + + +def _mcp_payload() -> dict[str, Any]: + return create_mcp_webhook_payload( + task_id="task_123", + task_type="create_media_buy", + status="completed", + result={"media_buy_id": "mb_1"}, + timestamp=None, # deterministic output tested separately + idempotency_key="whk_01HW9D2T3VXQ5M7K9N1P3R5S7U", + ) + + +async def test_bearer_auth_adds_authorization_header() -> None: + """schemes=[Bearer] → Authorization: Bearer .""" + client, captured = await _capture_client() + config = ReportingWebhook( + url="https://buyer.example/webhooks/report", + authentication=RWAuth(schemes=["Bearer"], credentials=_BEARER_TOKEN), + reporting_frequency=ReportingFrequency.daily, + ) + + async with client: + response = await deliver(config, _mcp_payload(), client=client) + + assert response.status_code == 202 + assert len(captured) == 1 + sent = captured[0] + assert sent.headers["authorization"] == f"Bearer {_BEARER_TOKEN}" + assert "x-adcp-signature" not in sent.headers + + +async def test_hmac_auth_signs_posted_bytes() -> None: + """schemes=[HMAC-SHA256] → signature over the exact bytes that hit the wire.""" + client, captured = await _capture_client() + config = PushNotificationConfig( + url="https://buyer.example/webhooks/mb", + authentication=PNAuthentication(schemes=["HMAC-SHA256"], credentials=_SECRET), + ) + + async with client: + await deliver(config, _mcp_payload(), client=client) + + sent = captured[0] + assert "x-adcp-signature" in sent.headers + assert "x-adcp-timestamp" in sent.headers + + # Recompute the signature over the exact bytes httpx posted — if the + # helper re-serialized anywhere after signing, this fails. + sig_header = sent.headers["x-adcp-signature"] + timestamp = sent.headers["x-adcp-timestamp"] + assert sig_header.startswith("sha256=") + expected = hmac.new( + _SECRET.encode(), + f"{timestamp}.".encode() + sent.content, + hashlib.sha256, + ).hexdigest() + assert sig_header == f"sha256={expected}" + + +async def test_token_echo_opt_in_mcp() -> None: + """token_field= echoes config.token into the MCP body at that key.""" + client, captured = await _capture_client() + config = PushNotificationConfig( + url="https://buyer.example/webhooks/mb", + token=_PUSH_TOKEN, + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + + async with client: + await deliver(config, _mcp_payload(), client=client, token_field=_TOKEN_FIELD) + + body = json.loads(captured[0].content) + assert body[_TOKEN_FIELD] == _PUSH_TOKEN + + +async def test_token_echo_opt_in_a2a_metadata() -> None: + """For Task payloads, opt-in echo attaches the token under metadata — + Task top-level fields are strictly typed so arbitrary keys can only go + into the metadata bag.""" + client, captured = await _capture_client() + task = Task( + id="t1", + context_id="c1", + status=TaskStatus(state=TaskState.completed), + artifacts=[ + Artifact( + artifact_id="a1", + parts=[Part(root=DataPart(data={"media_buy_id": "mb_1"}))], + ) + ], + ) + config = PushNotificationConfig( + url="https://buyer.example/webhooks/mb", + token=_PUSH_TOKEN, + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + + async with client: + await deliver(config, task, client=client, token_field=_TOKEN_FIELD) + + body = json.loads(captured[0].content) + assert body["metadata"][_TOKEN_FIELD] == _PUSH_TOKEN + # The A2A Task's native shape survived serialization. + assert body["artifacts"][0]["artifactId"] == "a1" + assert body["status"]["state"] == "completed" + + +async def test_token_echo_default_is_opt_in_only() -> None: + """Without token_field=, a token on the config does NOT land in the body. + + The AdCP legacy schema says the token is echoed but doesn't name the + wire field — so the caller must pick one explicitly. Default is silence.""" + client, captured = await _capture_client() + config = PushNotificationConfig( + url="https://buyer.example/webhooks/mb", + token=_PUSH_TOKEN, + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + + async with client: + await deliver(config, _mcp_payload(), client=client) + + body = json.loads(captured[0].content) + assert "push_notification_token" not in body + assert _PUSH_TOKEN not in body.values() + + +async def test_retry_produces_byte_identical_body() -> None: + """Two deliver() calls with the same payload produce byte-identical bodies. + + Load-bearing property: retries replay the same bytes, so receivers dedupe + by ``idempotency_key`` on a payload that looks exactly like the first + attempt. If serialization weren't deterministic here, retried webhooks + would appear as distinct events.""" + client, captured = await _capture_client() + config = ReportingWebhook( + url="https://buyer.example/webhooks/report", + authentication=RWAuth(schemes=["Bearer"], credentials=_BEARER_TOKEN), + reporting_frequency=ReportingFrequency.daily, + ) + payload = _mcp_payload() + + async with client: + await deliver(config, payload, client=client) + await deliver(config, payload, client=client) + + assert captured[0].content == captured[1].content + + +async def test_accepts_dict_config() -> None: + """Sellers reading PushNotificationConfig from a raw request dict shouldn't + need to round-trip through the Pydantic model.""" + client, captured = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + + async with client: + response = await deliver(config, _mcp_payload(), client=client) + + assert response.status_code == 202 + assert captured[0].headers["authorization"] == f"Bearer {_BEARER_TOKEN}" + + +async def test_no_authentication_raises_pointing_to_websocksender() -> None: + """Absent ``authentication`` is a spec violation (push-notification-config + says the seller MUST sign with RFC 9421). The helper refuses rather than + silently posting unsigned.""" + client, _ = await _capture_client() + config = {"url": "https://buyer.example/webhooks/mb"} + + async with client: + with pytest.raises(ValueError, match="WebhookSender"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_unknown_auth_scheme_raises() -> None: + """Schemes outside the legacy set fail loudly — silent no-op would hand + the caller an unsigned POST under the illusion of having signed it.""" + client, _ = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Digest"], "credentials": "c" * 40}, + } + async with client: + with pytest.raises(ValueError, match="unknown authentication scheme"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_hmac_without_credentials_raises() -> None: + """HMAC-SHA256 without credentials is not a valid config; refuse.""" + client, _ = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["HMAC-SHA256"]}, + } + async with client: + with pytest.raises(ValueError, match="credentials"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_missing_url_raises() -> None: + """A config without ``url`` is unusable — raise at the boundary, not + inside httpx.""" + client, _ = await _capture_client() + config = {"authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}} + async with client: + with pytest.raises(ValueError, match="url"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_http_url_rejected() -> None: + """HTTP would expose the body, token, and Authorization in transit.""" + client, _ = await _capture_client() + config = { + "url": "http://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + async with client: + with pytest.raises(ValueError, match="https"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_crlf_in_credentials_rejected() -> None: + """CRLF in credentials could smuggle headers past receivers that don't + enforce RFC-compliant header parsing.""" + client, _ = await _capture_client() + bad = "x" * 30 + "\r\nX-Admin: true" + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": bad}, + } + async with client: + with pytest.raises(ValueError, match="control characters"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_extra_headers_merge_but_reserved_are_rejected() -> None: + """Custom headers merge; auth, content, signature headers stay sender-owned.""" + client, captured = await _capture_client() + config = ReportingWebhook( + url="https://buyer.example/webhooks/report", + authentication=RWAuth(schemes=["Bearer"], credentials=_BEARER_TOKEN), + reporting_frequency=ReportingFrequency.daily, + ) + + async with client: + await deliver( + config, + _mcp_payload(), + client=client, + extra_headers={"X-Trace-Id": "trace-abc"}, + ) + assert captured[0].headers["x-trace-id"] == "trace-abc" + + # Every reserved header name must be rejected with a class-appropriate + # message. The mistake categories differ — someone passing Authorization + # usually doesn't know about config.authentication; someone passing + # Signature-Input is debugging and needs pointed at WebhookSender. + reserved_samples = { + "Authorization": "config.authentication", + "Content-Type": "application/json", + "Content-Digest": "WebhookSender", + "Signature": "WebhookSender", + "Signature-Input": "WebhookSender", + "X-AdCP-Signature": "HMAC-SHA256", + "X-AdCP-Timestamp": "HMAC-SHA256", + "Host": "sender-owned", + "Content-Length": "sender-owned", + } + for header, expected_phrase in reserved_samples.items(): + override_client, _ = await _capture_client() + async with override_client: + with pytest.raises(ValueError, match=expected_phrase): + await deliver( + config, + _mcp_payload(), + client=override_client, + extra_headers={header: "attacker"}, + ) + + +async def test_timeout_seconds_with_client_raises() -> None: + """Configuring timeout on a helper-owned client and a shared client at + the same time is ambiguous — force the caller to pick.""" + client, _ = await _capture_client() + config = ReportingWebhook( + url="https://buyer.example/webhooks/report", + authentication=RWAuth(schemes=["Bearer"], credentials=_BEARER_TOKEN), + reporting_frequency=ReportingFrequency.daily, + ) + async with client: + with pytest.raises(ValueError, match="timeout_seconds"): + await deliver(config, _mcp_payload(), client=client, timeout_seconds=5.0) + + +async def test_url_with_embedded_userinfo_rejected() -> None: + """URLs like https://user:pass@host/ get logged by every HTTP intermediary. + The helper forces credentials into config.authentication where the + signing path keeps them out of URL-shaped logs.""" + client, _ = await _capture_client() + config = { + "url": "https://user:secret@buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + async with client: + with pytest.raises(ValueError, match="userinfo"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_body_size_cap_enforced() -> None: + """Oversize bodies raise with an actionable message before a 10MB POST + that the receiver would reject at the reverse proxy anyway.""" + client, _ = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + # One oversized field in the payload — 11MB of 'x'. + oversize = _mcp_payload() + oversize["result"] = {"blob": "x" * (11 * 1024 * 1024)} + async with client: + with pytest.raises(ValueError, match="10,485,760"): + await deliver(config, oversize, client=client) + + +async def test_extra_headers_count_cap() -> None: + """A caller iterating a large container into extra_headers shouldn't + produce an unbounded header block.""" + client, _ = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + headers = {f"X-Tag-{i}": str(i) for i in range(65)} + async with client: + with pytest.raises(ValueError, match="extra_headers has 65 entries"): + await deliver(config, _mcp_payload(), client=client, extra_headers=headers) + + +async def test_authentication_wrong_type_raises() -> None: + """config.authentication must be a mapping — a bare string (common mistake + when a seller writes ``authentication='Bearer xxx'``) fails cleanly rather + than raising AttributeError deep in the helper.""" + client, _ = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": "Bearer " + _BEARER_TOKEN, + } + async with client: + with pytest.raises(ValueError, match="authentication must be"): + await deliver(config, _mcp_payload(), client=client) + + +async def test_retry_requires_caller_to_not_mutate_payload() -> None: + """Byte-identity of retries rests on the *caller* not mutating the + payload between calls. This test documents the contract by showing + what breaks when the caller refreshes a timestamp mid-retry. + + It is NOT a bug in the helper — the docstring flags the contract — + but a test that exercises the failure mode makes the contract concrete.""" + client, captured = await _capture_client() + config = { + "url": "https://buyer.example/webhooks/mb", + "authentication": {"schemes": ["Bearer"], "credentials": _BEARER_TOKEN}, + } + payload = _mcp_payload() + async with client: + await deliver(config, payload, client=client) + # Caller mutates — retry bytes now differ. + payload["timestamp"] = "2026-01-01T00:00:00Z" + await deliver(config, payload, client=client) + + assert captured[0].content != captured[1].content + + +async def test_signed_bytes_match_posted_bytes() -> None: + """The helper must POST via ``content=`` — ``json=`` would re-serialize + and break the signature invariant. Compare body bytes to a byte-exact + expected serialization.""" + client, captured = await _capture_client() + config = PushNotificationConfig( + url="https://buyer.example/webhooks/mb", + authentication=PNAuthentication(schemes=["HMAC-SHA256"], credentials=_SECRET), + ) + payload = _mcp_payload() + + async with client: + await deliver(config, payload, client=client) + + expected_body = json.dumps(payload).encode("utf-8") + assert captured[0].content == expected_body + + +async def test_deprecation_warning_fires_for_legacy_auth() -> None: + """Mirrors the receiver-side :class:`LegacyWebhookHmacError` deprecation + warning — senders must know to migrate to 9421 before 4.0 removes it.""" + # Reset the module-level "already warned" flag so this test is hermetic. + import adcp.webhooks as webhooks_module + + webhooks_module._AUTH_DEPRECATION_WARNED = False + + client, _ = await _capture_client() + config = ReportingWebhook( + url="https://buyer.example/webhooks/report", + authentication=RWAuth(schemes=["Bearer"], credentials=_BEARER_TOKEN), + reporting_frequency=ReportingFrequency.daily, + ) + + async with client: + with pytest.warns(DeprecationWarning, match="AdCP 4.0"): + await deliver(config, _mcp_payload(), client=client)