diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index b0a0bac7..ac6b7160 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -85,14 +85,26 @@ class VerifiedSigner: class VerifierCapability: """The `request_signing` block a verifier advertises on get_adcp_capabilities. - Defaults to `covers_content_digest="required"`: body integrity must be - authenticated end-to-end. With `"either"`, a MITM inside TLS termination - (reverse proxy, service mesh) can swap bodies freely on unsigned-digest - requests — only pick `"either"` if you've weighed that tradeoff. + Defaults to ``covers_content_digest="either"`` per the AdCP 3.0 schema + (`get-adcp-capabilities-response.json` declares this as the default + explicitly). The schema rationale recommends `"required"` for + spend-committing operations in production, and AdCP 4.0 recommends + `"required"` more broadly. + + Operators who want body-integrity authentication end-to-end on + every request — closing the MITM-inside-TLS-termination case where + a reverse proxy or service mesh can swap bodies on unsigned-digest + requests — opt INTO ``covers_content_digest="required"`` explicitly, + or use ``required_for=frozenset({"create_media_buy", ...})`` to + promote spend-committing operations selectively. + + The webhook-signing profile (``adcp.signing.webhook_verifier``) hard- + codes ``"required"`` regardless of this default — webhook bodies + always carry signed digests. """ supported: bool = True - covers_content_digest: CoversDigestPolicy = "required" + covers_content_digest: CoversDigestPolicy = "either" required_for: frozenset[str] = field(default_factory=frozenset) supported_for: frozenset[str] = field(default_factory=frozenset) diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index 4ee80c1a..b457332c 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -705,6 +705,8 @@ async def deliver( extra_headers: Mapping[str, str] | None = None, timeout_seconds: float | None = None, token_field: str | None = None, + allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, ) -> httpx.Response: """Dispatch one legacy-auth webhook in a single call. @@ -728,11 +730,23 @@ async def deliver( :func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload`), an a2a ``Task`` / ``TaskStatusUpdateEvent``, or a plain dict. Models are dumped with ``mode="json", exclude_none=True``. - client: Optional shared ``httpx.AsyncClient``. Recommended in - production for connection pooling and egress-policy enforcement - (a custom ``httpx.BaseTransport`` is the right place to block - SSRF to private IPs — the helper validates scheme but cannot - see post-DNS resolution without racing TOCTOU). + client: Optional shared ``httpx.AsyncClient``. When supplied, the + caller owns SSRF guarantees — the helper trusts the operator's + transport completely (typically a vetted egress proxy with + mTLS, or an ASGI transport for testing). When omitted, the + helper builds a per-request :class:`adcp.signing.IpPinnedTransport` + so the URL is resolved, SSRF-validated, and pinned to the + resolved IP — same defense applied to :class:`WebhookSender`. + allow_private: Forwarded to the per-request pinned transport + (owned-client path only). ``False`` (default) rejects URLs + whose resolved IP is in a private / loopback / link-local + range. Set ``True`` for dev/CI fixtures that post to internal + endpoints; production should leave it ``False``. + allowed_ports: Forwarded to the per-request pinned transport + (owned-client path only). ``None`` (default) imposes no port + filter — AdCP doesn't constrain webhook ports. Hardened + deployments pass :data:`adcp.signing.DEFAULT_ALLOWED_PORTS` + (`{443, 8443}`) or a custom set. extra_headers: Merged last. May not override any of ``Content-Type``, ``Content-Digest``, ``Content-Length``, ``Host``, ``Authorization``, ``Signature``, ``Signature-Input``, @@ -768,10 +782,13 @@ async def deliver( DeprecationWarning (fires once): ``authentication`` is a 3.x fallback. Security notes: - * ``config.url`` is buyer-controlled. The helper enforces HTTPS and - rejects control characters but does NOT block private / link-local - destinations — wire an egress policy via ``client.transport`` to - stop SSRF into your VPC or cloud metadata service. + * ``config.url`` is buyer-controlled. The helper enforces HTTPS, + rejects control characters, AND (on the owned-client path) + builds a per-request IP-pinned transport that runs the full + SSRF range check (loopback / RFC 1918 / link-local / CGNAT / + IPv6 ULA / multicast / cloud metadata) and pins the connection + to the validated IP. Operator-supplied clients skip the SSRF + guard — they own egress policy on their transport. * ``config.token`` sits in the request body, so any receiver that logs bodies retains it indefinitely. Treat the token as a medium-sensitivity correlator, not a long-lived secret. @@ -862,14 +879,32 @@ async def deliver( _validate_header_value(f"extra_headers[{key!r}]", value) headers[key] = value - owns_client = client is None effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS - http_client = client or httpx.AsyncClient(timeout=effective_timeout) - try: - return await http_client.post(url, content=body_bytes, headers=headers) - finally: - if owns_client: - await http_client.aclose() + if client is None: + # Owned-client path: build a per-request IP-pinned transport so + # the URL is SSRF-validated and pinned to the resolved IP, with + # follow_redirects=False (rebinding-via-redirect defense) and + # trust_env=False (HTTPS_PROXY env vars cannot route the request + # away from the pinned target). Mirrors the WebhookSender pattern + # — see adcp.webhook_sender._send_bytes for the same shape. + from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport + + transport = build_async_ip_pinned_transport( + url, + allow_private=allow_private, + allowed_ports=allowed_ports, + ) + async with httpx.AsyncClient( + transport=transport, + timeout=effective_timeout, + follow_redirects=False, + trust_env=False, + ) as http_client: + return await http_client.post(url, content=body_bytes, headers=headers) + # Operator-supplied client: trust them completely; they own SSRF + # guarantees on their transport (vetted egress proxy, ASGI test + # transport, etc.). + return await client.post(url, content=body_bytes, headers=headers) def _extract_config_fields( diff --git a/tests/conformance/signing/test_verifier_behaviors.py b/tests/conformance/signing/test_verifier_behaviors.py index e163de19..6432079c 100644 --- a/tests/conformance/signing/test_verifier_behaviors.py +++ b/tests/conformance/signing/test_verifier_behaviors.py @@ -346,6 +346,11 @@ def test_verify_options_rejects_positional() -> None: # ---- 6a: VerifierCapability default ---- -def test_verifier_capability_defaults_to_required_digest() -> None: +def test_verifier_capability_defaults_to_either_digest() -> None: + """The AdCP 3.0 schema declares ``covers_content_digest`` default as + ``"either"`` (``get-adcp-capabilities-response.json``); ``"required"`` + is opt-in for spend-committing operations. AdCP 4.0 is expected to + recommend ``"required"`` more broadly. Operators promote operations + selectively via ``required_for=frozenset({"create_media_buy", ...})``.""" cap = VerifierCapability() - assert cap.covers_content_digest == "required" + assert cap.covers_content_digest == "either" diff --git a/tests/test_webhooks_deliver.py b/tests/test_webhooks_deliver.py index 82f276bb..621a751a 100644 --- a/tests/test_webhooks_deliver.py +++ b/tests/test_webhooks_deliver.py @@ -16,11 +16,13 @@ import hmac import json from typing import Any +from unittest.mock import patch import httpx import pytest from a2a.types import TaskState # TaskState is the proto enum; still exported +from adcp.signing import SSRFValidationError from adcp.types.generated_poc.core.push_notification_config import ( Authentication as PNAuthentication, ) @@ -546,3 +548,99 @@ def test_normalize_passthrough_for_unknown_enum_prefixes() -> None: assert out["status"]["state"] == "completed" assert out["status"]["message"]["role"] == "user" assert out["role"] == "user" + + +# -- SSRF guard regression tests (parity with WebhookSender) --------------- + + +@pytest.mark.asyncio +async def test_owned_client_rejects_loopback_destination() -> None: + """The legacy ``deliver()`` helper now wires the same per-request + IP-pinned transport that ``WebhookSender._send_bytes`` uses (see #299). + A buyer-supplied URL pointing at 127.0.0.1 must reject before the + POST hits a real socket — the previous unguarded ``httpx.AsyncClient`` + would have delivered to the loopback address. + + Operator-supplied clients (``client=...``) skip the SSRF guard by + design; they own egress policy on their transport.""" + config = PushNotificationConfig( + url="https://127.0.0.1/webhooks/adcp", + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + payload = create_mcp_webhook_payload( + task_id="task_ssrf", + task_type="create_media_buy", + status="completed", + ) + + with pytest.raises(SSRFValidationError): + await deliver(config, payload) + + +@pytest.mark.asyncio +async def test_allow_private_dev_escape_hatch() -> None: + """Adopters with dev/CI fixtures posting to internal endpoints + pass ``allow_private=True`` to disable the IP-range check. Pin-and- + bind still applies (URL gets resolved + connection pinned), and the + rest of the contract stands — bytes signed == bytes posted.""" + config = PushNotificationConfig( + url="https://10.0.0.1/webhooks/adcp", + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + payload = create_mcp_webhook_payload( + task_id="task_priv", + task_type="create_media_buy", + status="completed", + ) + + # Stub the pinned-transport build with a transport that actually + # accepts a body without opening a socket. We're testing the kwarg + # plumbing, not the network round-trip. + # + # Patch target is the source module (adcp.signing.ip_pinned_transport) + # because deliver() does a lazy in-function ``from ... import``. If + # the import is ever moved to module-level on adcp.webhooks, the + # patch target must follow — `patch("adcp.webhooks.build_async_ip_pinned_transport", ...)`. + captured: dict[str, Any] = {} + + def fake_build(uri: str, **kwargs: Any) -> Any: + captured.update(kwargs) + captured["uri"] = uri + return httpx.MockTransport(lambda _req: httpx.Response(200)) + + with patch( + "adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport", + side_effect=fake_build, + ): + response = await deliver(config, payload, allow_private=True) + + assert response.status_code == 200 + assert captured["allow_private"] is True + assert captured["uri"] == "https://10.0.0.1/webhooks/adcp" + + +@pytest.mark.asyncio +async def test_operator_supplied_client_skips_ssrf_guard() -> None: + """When the operator provides their own ``httpx.AsyncClient``, + ``deliver()`` does NOT build a pinned transport — the operator owns + SSRF on their transport (vetted egress proxy, ASGI test transport). + A request that would fail the SSRF range check under the owned-client + path (resolves to a private/loopback IP) succeeds via the operator's + MockTransport without any DNS lookup.""" + # https:// is required by the scheme check; MockTransport captures + # the request without a real socket so SSRF range never fires. + config = PushNotificationConfig( + url="https://operator-trusted.test/webhooks/adcp", + authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN), + ) + payload = create_mcp_webhook_payload( + task_id="task_op", + task_type="create_media_buy", + status="completed", + ) + + transport = httpx.MockTransport(lambda _req: httpx.Response(200)) + async with httpx.AsyncClient(transport=transport) as client: + response = await deliver(config, payload, client=client) + + assert response.status_code == 200