From 36a4e65105ff2314972501e74999712319e11c6a Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 29 Apr 2026 11:38:54 -0400 Subject: [PATCH 1/4] fix(signing): close 5 SSRF and tenant-isolation gaps from foundation audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-foundation cleanup surfacing from the v6.0 DecisioningPlatform foundation audit. Each fix closes a real bug or spec gap in the existing adcp.signing surface independently of the framework work that builds on top. 1. Port allowlist for SSRF-validated outbound HTTP (adcp.signing.jwks.{validate_jwks_uri, resolve_and_validate_host}, ip_pinned_transport.{build_ip_pinned_transport, build_async_ip_pinned_transport}) - Default permits {443, 8443}; rejects :25, :6379, :11211, etc. on resolved public IPs. Buyers can no longer smuggle traffic to internal SMTP / Redis / Memcached via webhook URLs on non-standard ports even when the IP itself is routable. - Configurable via allowed_ports kwarg; empty frozenset is the test-only escape hatch. - Test: tests/conformance/signing/test_jwks.py (test_ssrf_rejects_disallowed_ports + parametrized matrix) 2. WebhookSender owned-client path uses pin-and-bind transport (adcp.webhook_sender.WebhookSender._send_bytes) - Previous implementation reused a single httpx.AsyncClient across all destinations and bypassed the IP-pinned transport entirely. A buyer-supplied webhook URL pointing at 127.0.0.1 or AWS metadata would deliver successfully. - Now: when the sender owns its httpx client (default), every delivery builds a per-request AsyncIpPinnedTransport. Per-request re-resolution is intentional — keeping a pinned transport alive across deliveries to the same hostname would defeat the rebinding defense. - When the operator supplies their own client (vetted egress proxy, ASGI test transport), the framework trusts them completely; the operator owns SSRF guarantees on their transport. - Tests: test_owned_client_rejects_loopback_destination, test_owned_client_rejects_disallowed_port 3. Tenant-scoped JWKS resolver Protocol (adcp.signing.jwks.{JwksResolver, AsyncJwksResolver}, adcp.signing.verifier.VerifyOptions, webhook_verifier.WebhookVerifyOptions) - Adds optional ``tenant_id`` kwarg to the resolver Protocol so a resolver instance shared across tenants can refuse keys outside the active tenant's published JWKS. Cross-tenant key confusion (a buyer signing for tenant B who knows tenant A's key_id) is closed at the resolver layer, not the verifier. - Single-tenant in-tree impls (Static, Caching, AsyncCaching) accept the kwarg as a pass-through — tenant scoping is a wrapper concern, and adopters compose tenant-scoped resolvers around existing single-tenant resolvers. - VerifyOptions.tenant_id and WebhookVerifyOptions.tenant_id thread the value through; verifier.py:227 passes it on resolver call. - Test: tests/conformance/signing/test_jwks.py (test_tenant_scoping_wrapper_pattern — reference pattern for adopters; test_static_resolver_accepts_tenant_id_kwarg — backward-compat invariant) 4. content-digest required-by-default for inbound request signing (adcp.signing.verifier.VerifierCapability) - Default already correct (covers_content_digest="required" at verifier.py:95). Adds a regression test pinning the default so a future "make it lenient" refactor surfaces in CI. - Body integrity must be authenticated end-to-end; "either" or "forbidden" lets a MITM inside TLS termination swap bodies on signed requests whose digest isn't covered. - Test: tests/conformance/signing/test_verifier_defaults.py (test_default_covers_content_digest_is_required) 5. WebhookPayload.operation_id docstring fix (adcp.webhooks.create_mcp_webhook_payload) - Docstring previously said "deprecated from payload, should be in URL routing, but included for backward compatibility." Contradicted the schema at mcp-webhook-payload.json which says publishers MUST echo this back so buyers correlate notifications without parsing URL paths. - Field already supported in the payload constructor; only the docstring needed correction. Adds a test confirming the end-to-end echo from send_mcp(operation_id=...) into the delivered payload. - Test: test_send_mcp_threads_operation_id_into_payload All 5 fixes ship together as a security-prep PR before the v6.0 DecisioningPlatform foundation work lands. Each is independent of the others; reviewers can evaluate by gap. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/signing/ip_pinned_transport.py | 32 ++++- src/adcp/signing/jwks.py | 79 ++++++++--- src/adcp/signing/verifier.py | 7 +- src/adcp/signing/webhook_verifier.py | 5 + src/adcp/webhook_sender.py | 47 ++++++- src/adcp/webhooks.py | 8 +- .../signing/test_async_revocation.py | 31 ++--- .../signing/test_ip_pinned_transport.py | 9 +- tests/conformance/signing/test_jwks.py | 127 +++++++++++++++++- .../signing/test_verifier_defaults.py | 13 ++ .../signing/test_verifier_vectors.py | 3 +- .../signing/test_webhook_sender_e2e.py | 86 ++++++++++++ 12 files changed, 398 insertions(+), 49 deletions(-) diff --git a/src/adcp/signing/ip_pinned_transport.py b/src/adcp/signing/ip_pinned_transport.py index 0a8df62d..b34172da 100644 --- a/src/adcp/signing/ip_pinned_transport.py +++ b/src/adcp/signing/ip_pinned_transport.py @@ -289,13 +289,17 @@ def build_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> IpPinnedTransport: """Resolve ``uri`` once and return a transport pinned to the validated IP. Raises :class:`SSRFValidationError` if the URI's scheme isn't - ``http``/``https``, the host doesn't resolve, or every resolved - IP is in a blocked range. + ``http``/``https``, the port is not in the allowlist, the host + doesn't resolve, or every resolved IP is in a blocked range. + + ``allowed_ports`` defaults to + :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`). Typical use inside a fetcher:: @@ -303,7 +307,11 @@ def build_ip_pinned_transport( with httpx.Client(transport=transport, timeout=10.0) as client: response = client.get(uri) """ - hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private) + hostname, resolved_ip, _port = resolve_and_validate_host( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + ) return IpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify) @@ -311,6 +319,7 @@ def build_async_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> AsyncIpPinnedTransport: """Build an :class:`AsyncIpPinnedTransport` for ``uri``. @@ -318,8 +327,15 @@ def build_async_ip_pinned_transport( Resolve + validate run synchronously (``socket.getaddrinfo``); this function itself is not awaitable. The returned transport plugs into :class:`httpx.AsyncClient`. + + ``allowed_ports`` defaults to + :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`). """ - hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private) + hostname, resolved_ip, _port = resolve_and_validate_host( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + ) return AsyncIpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify) @@ -327,6 +343,7 @@ def abuild_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> AsyncIpPinnedTransport: """Deprecated alias for :func:`build_async_ip_pinned_transport`. @@ -343,4 +360,9 @@ def abuild_ip_pinned_transport( DeprecationWarning, stacklevel=2, ) - return build_async_ip_pinned_transport(uri, allow_private=allow_private, verify=verify) + return build_async_ip_pinned_transport( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + verify=verify, + ) diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index 79fcbc9c..bfd98223 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -53,6 +53,14 @@ } ) +# Allowed destination ports for SSRF-validated outbound HTTP. Buyers supplying +# webhook URLs on non-standard ports can otherwise smuggle traffic to internal +# services bound to the same routable IP — :25 (SMTP relay), :6379 (Redis), +# :11211 (Memcached), etc. The validator rejects these even when the IP itself +# is public. Tests override via the ``allowed_ports`` parameter on +# :func:`resolve_and_validate_host`. +DEFAULT_ALLOWED_PORTS: frozenset[int] = frozenset({443, 8443}) + # Upper bound on the number of resolved addresses examined per validation call. # A malicious DNS server can return thousands of records as a mild amplification # vector against the validator's inner loop. @@ -84,9 +92,17 @@ class JwksResolver(Protocol): :class:`CachingJwksResolver` (fetches + caches from a URI). Async callers use :class:`AsyncJwksResolver` instead. + + ``tenant_id`` is optional; multi-tenant deployments pass it so a + resolver instance shared across tenants can refuse keys outside the + active tenant's published JWKS. Single-tenant deployments leave it + unset; resolvers that don't enforce tenant scoping ignore it. + Cross-tenant key confusion (a buyer signing for tenant B who knows + tenant A's ``key_id`` presenting to A's transport) is closed at the + resolver layer, not the verifier. """ - def __call__(self, keyid: str) -> dict[str, Any] | None: ... + def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: ... class AsyncJwksResolver(Protocol): @@ -99,25 +115,35 @@ class AsyncJwksResolver(Protocol): it in a thin async callable — there's no async work, just a dict lookup — but typically you'll just use the static one directly where an :class:`AsyncJwksResolver` is expected via :func:`as_async`. + + ``tenant_id`` semantics match :class:`JwksResolver`. """ - async def __call__(self, keyid: str) -> dict[str, Any] | None: ... + async def __call__( + self, keyid: str, *, tenant_id: str | None = None + ) -> dict[str, Any] | None: ... -def validate_jwks_uri(uri: str, *, allow_private: bool = False) -> None: - """Raise SSRFValidationError if `uri` resolves to a blocked IP or has a bad scheme. +def validate_jwks_uri( + uri: str, + *, + allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, +) -> None: + """Raise SSRFValidationError on blocked IP, bad scheme, or disallowed port. - This is kept as a standalone no-return helper for callers that only - want validation — :func:`resolve_and_validate_host` returns the - accepted IP when the caller needs it for IP-pinned connects. + Standalone no-return helper for callers that only want validation — + :func:`resolve_and_validate_host` returns the accepted IP when the + caller needs it for IP-pinned connects. """ - resolve_and_validate_host(uri, allow_private=allow_private) + resolve_and_validate_host(uri, allow_private=allow_private, allowed_ports=allowed_ports) def resolve_and_validate_host( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, ) -> tuple[str, str, int]: """Resolve the URI's hostname once and return ``(hostname, ip, port)``. @@ -138,6 +164,12 @@ def resolve_and_validate_host( allow_private: Skip the reserved-range check. For tests only; cloud-metadata IPs remain blocked unconditionally. + allowed_ports: + Override the default destination port allowlist. Pass an empty + frozenset to disable the check entirely (tests only); pass a + custom set to permit non-standard ports for trusted on-prem + deployments. ``None`` (default) uses :data:`DEFAULT_ALLOWED_PORTS` + (`{443, 8443}`). Returns ------- @@ -148,8 +180,8 @@ def resolve_and_validate_host( Raises ------ SSRFValidationError - Scheme is not ``http``/``https``, the hostname doesn't resolve, - or every resolved IP is in a blocked range. + Scheme is not ``http``/``https``, port is not in the allowlist, + the hostname doesn't resolve, or every resolved IP is in a blocked range. """ parts = urlsplit(uri) if parts.scheme not in ("http", "https"): @@ -177,6 +209,12 @@ def resolve_and_validate_host( except (UnicodeError, UnicodeEncodeError) as exc: raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80) + effective_allowed_ports = DEFAULT_ALLOWED_PORTS if allowed_ports is None else allowed_ports + if effective_allowed_ports and port not in effective_allowed_ports: + raise SSRFValidationError( + f"port {port} not allowed for SSRF-validated fetch " + f"(allowed: {sorted(effective_allowed_ports)})" + ) try: infos = socket.getaddrinfo(host, None) @@ -291,7 +329,13 @@ def __init__( self._last_attempt: float | None = None self._primed = False - def __call__(self, keyid: str) -> dict[str, Any] | None: + def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + # tenant_id ignored — this resolver is bound to a single jwks_uri + # at construction. Multi-tenant deployments wire one resolver per + # tenant and route by tenant_id externally; a tenant-scoping + # resolver wrapper that checks tenant_id against a registry can + # compose this resolver internally. + del tenant_id if keyid in self._cache: return self._cache[keyid] now = self._clock() @@ -327,7 +371,9 @@ class StaticJwksResolver: def __init__(self, jwks: dict[str, Any]) -> None: self._keys = {jwk["kid"]: jwk for jwk in jwks.get("keys", []) if "kid" in jwk} - def __call__(self, keyid: str) -> dict[str, Any] | None: + def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + # tenant_id ignored — single-tenant by construction. + del tenant_id return self._keys.get(keyid) @@ -402,7 +448,9 @@ def __init__( # across ``asyncio.run`` boundaries. self._lock: asyncio.Lock = asyncio.Lock() - async def __call__(self, keyid: str) -> dict[str, Any] | None: + async def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + # tenant_id ignored — see CachingJwksResolver.__call__. + del tenant_id if keyid in self._cache: return self._cache[keyid] now = self._clock() @@ -449,8 +497,8 @@ def as_async_resolver(resolver: JwksResolver) -> AsyncJwksResolver: work (just a dict lookup); the wrapper is a shape adapter. """ - async def resolve(keyid: str) -> dict[str, Any] | None: - return resolver(keyid) + async def resolve(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + return resolver(keyid, tenant_id=tenant_id) return resolve @@ -461,6 +509,7 @@ async def resolve(keyid: str) -> dict[str, Any] | None: "AsyncJwksFetcher", "AsyncJwksResolver", "CachingJwksResolver", + "DEFAULT_ALLOWED_PORTS", "DEFAULT_JWKS_COOLDOWN_SECONDS", "DEFAULT_JWKS_TIMEOUT_SECONDS", "JwksFetcher", diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index b0a0bac7..f354ce1b 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -131,6 +131,11 @@ class VerifyOptions: expected_adcp_use: str = ADCP_USE_REQUEST allowed_algs: frozenset[str] = ALLOWED_ALGS agent_url: str | None = None + # Multi-tenant deployments pass tenant_id so a JWKS resolver shared + # across tenants refuses keys outside the active tenant's published + # JWKS. Single-tenant deployments leave it None; resolvers that + # don't enforce tenant scoping ignore it. See JwksResolver Protocol. + tenant_id: str | None = None def verify_request_signature( @@ -219,7 +224,7 @@ def verify_request_signature( message=f"nonce exceeds {_MAX_PARAM_LEN} bytes", ) - jwk = options.jwks_resolver(keyid) + jwk = options.jwks_resolver(keyid, tenant_id=options.tenant_id) if jwk is None: raise SignatureVerificationError( REQUEST_SIGNATURE_KEY_UNKNOWN, diff --git a/src/adcp/signing/webhook_verifier.py b/src/adcp/signing/webhook_verifier.py index 75fcc833..b3ea6c7b 100644 --- a/src/adcp/signing/webhook_verifier.py +++ b/src/adcp/signing/webhook_verifier.py @@ -84,6 +84,10 @@ class WebhookVerifyOptions: allowed_algs: frozenset[str] = ALLOWED_ALGS sender_url: str | None = None clock: Callable[[], float] = time.time + # Multi-tenant deployments pass tenant_id so a JWKS resolver shared + # across tenants refuses keys outside the active tenant's published + # JWKS — same semantics as VerifyOptions.tenant_id. + tenant_id: str | None = None @dataclass(frozen=True) @@ -152,6 +156,7 @@ def verify_webhook_signature( expected_adcp_use=ADCP_USE_WEBHOOK, allowed_algs=options.allowed_algs, agent_url=options.sender_url, + tenant_id=options.tenant_id, ) try: diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index eaeb4019..c9505285 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -44,6 +44,7 @@ load_private_key_pem, private_key_from_jwk, ) +from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport from adcp.signing.webhook_signer import sign_webhook from adcp.types import GeneratedTaskStatus from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData @@ -112,6 +113,8 @@ def __init__( alg: str, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> None: self._private_key = private_key self._key_id = key_id @@ -119,6 +122,8 @@ def __init__( self._timeout = timeout_seconds self._client = client self._owns_client = client is None + self._allow_private_destinations = allow_private_destinations + self._allowed_destination_ports = allowed_destination_ports @classmethod def from_jwk( @@ -471,7 +476,23 @@ async def _send_bytes( idempotency_key: str, extra_headers: Mapping[str, str] | None, ) -> WebhookDeliveryResult: - """Sign + POST a pre-serialized body. Shared by send_raw and resend.""" + """Sign + POST a pre-serialized body through an SSRF-validated transport. + + When the sender owns its httpx client (the default — ``client=None`` + was passed to ``__init__``), every delivery builds a per-request + :class:`adcp.signing.ip_pinned_transport.AsyncIpPinnedTransport` + that resolves the destination, runs the full SSRF range check + (loopback / RFC 1918 / link-local / CGNAT / IPv6 ULA / multicast / + cloud metadata), enforces the port allowlist, and pins the + connection to the validated IP. This closes the DNS-rebinding + TOCTOU between validate and connect. + + When the operator supplied their own client + (``WebhookSender(client=...)`` — typically a vetted egress proxy + with mTLS to a known buyer set, or an ASGI transport for testing), + the sender trusts the operator's transport completely. Pin-and-bind + is skipped; the operator's transport owns SSRF. + """ base_headers = {"Content-Type": "application/json"} signed = sign_webhook( method="POST", @@ -495,8 +516,28 @@ async def _send_bytes( for k, v in extra_headers.items(): headers[k] = v - client = await self._get_client() - response = await client.post(url, content=body, headers=headers) + if self._owns_client: + # Per-request pinned transport. Building one client per delivery + # is the security-correct choice: keeping a pinned transport + # alive across deliveries to the same hostname would defeat the + # rebinding defense (the IP would be frozen at first delivery). + transport = build_async_ip_pinned_transport( + url, + allow_private=self._allow_private_destinations, + allowed_ports=self._allowed_destination_ports, + ) + async with httpx.AsyncClient( + transport=transport, + timeout=self._timeout, + follow_redirects=False, + ) as client: + response = await client.post(url, content=body, headers=headers) + else: + # Operator-supplied client — they own the SSRF guarantees on + # their transport (proxy allowlist, mTLS, etc.). + assert self._client is not None # narrowing for mypy + response = await self._client.post(url, content=body, headers=headers) + return WebhookDeliveryResult( status_code=response.status_code, idempotency_key=idempotency_key, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index e61b097f..4ee80c1a 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -107,8 +107,12 @@ def create_mcp_webhook_payload( task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy") timestamp: When the webhook was generated (defaults to current UTC time) result: Task-specific payload (AdCP response data) - operation_id: Publisher-defined operation identifier (deprecated from payload, - should be in URL routing, but included for backward compatibility) + operation_id: Client-generated identifier the buyer embedded in the + webhook URL when registering push-notification config. Publishers + MUST echo this back in the payload so buyers correlate notifications + without parsing URL paths (per ``mcp-webhook-payload.json``). + Senders extracting the value from the URL path on emission populate + this field; callers constructing payloads directly pass it through. message: Human-readable summary of task state context_id: Session/conversation identifier domain: AdCP domain this task belongs to diff --git a/tests/conformance/signing/test_async_revocation.py b/tests/conformance/signing/test_async_revocation.py index 250a981b..3c85b222 100644 --- a/tests/conformance/signing/test_async_revocation.py +++ b/tests/conformance/signing/test_async_revocation.py @@ -176,16 +176,15 @@ async def test_averify_jws_rejects_wrong_typ() -> None: token = b64_header + "." + b64_payload + "." + b64url_encode(signature) with pytest.raises(JwsMalformedError, match="typ"): - await averify_jws_document( - token, jwks_resolver=resolver, expected_typ=REVOCATION_LIST_TYP - ) + await averify_jws_document(token, jwks_resolver=resolver, expected_typ=REVOCATION_LIST_TYP) # -- as_async_resolver -------------------------------------------------- async def test_as_async_resolver_wraps_sync_resolver() -> None: - def sync_resolver(keyid: str) -> dict[str, Any] | None: + def sync_resolver(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + del tenant_id return {"kid": keyid} if keyid == "x" else None async_resolver: AsyncJwksResolver = as_async_resolver(sync_resolver) @@ -282,12 +281,12 @@ async def test_async_replay_older_list_rejected() -> None: revoked_kids=[], ) fetcher = _ScriptedAsyncFetcher() - fetcher.enqueue(FetchResult( - body=_sign_compact(newer, private=private), etag='"v2"', not_modified=False - )) - fetcher.enqueue(FetchResult( - body=_sign_compact(older, private=private), etag='"v1"', not_modified=False - )) + fetcher.enqueue( + FetchResult(body=_sign_compact(newer, private=private), etag='"v2"', not_modified=False) + ) + fetcher.enqueue( + FetchResult(body=_sign_compact(older, private=private), etag='"v1"', not_modified=False) + ) wall_clock, mono_clock, advance = _controllable_clock( datetime(2026, 4, 18, 14, 15, tzinfo=timezone.utc) @@ -349,9 +348,7 @@ async def test_async_aprime_fails_fast() -> None: async def test_async_is_jti_revoked() -> None: private, resolver = _operator_key_and_resolver() - token = _sign_compact( - _make_payload(revoked_jtis=["jti-abc"]), private=private - ) + token = _sign_compact(_make_payload(revoked_jtis=["jti-abc"]), private=private) fetcher = _ScriptedAsyncFetcher() fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) @@ -414,9 +411,7 @@ async def slow_fetcher( wall_clock=wall_clock, clock=mono_clock, ) - results = await asyncio.gather( - checker("rev"), checker("rev"), checker("rev"), checker("rev") - ) + results = await asyncio.gather(checker("rev"), checker("rev"), checker("rev"), checker("rev")) assert all(r is True for r in results) assert fetch_count[0] == 1 @@ -506,9 +501,7 @@ async def fetch( headers["If-Modified-Since"] = if_modified_since async with httpx.AsyncClient(transport=transport, base_url=ISSUER) as client: - response = await client.get( - "/.well-known/governance-revocations.json", headers=headers - ) + response = await client.get("/.well-known/governance-revocations.json", headers=headers) if response.status_code == 304: return FetchResult( body="", diff --git a/tests/conformance/signing/test_ip_pinned_transport.py b/tests/conformance/signing/test_ip_pinned_transport.py index b9292b8e..93d5c43f 100644 --- a/tests/conformance/signing/test_ip_pinned_transport.py +++ b/tests/conformance/signing/test_ip_pinned_transport.py @@ -95,8 +95,13 @@ def test_resolve_returns_tuple_of_host_ip_port() -> None: def test_resolve_defaults_http_port_80() -> None: # Even though we normally refuse non-https elsewhere, the helper - # itself is scheme-agnostic for the port default. - host, _ip, port = resolve_and_validate_host("http://example.com/jwks") + # itself is scheme-agnostic for the port default. Port 80 is outside + # the production allowlist {443, 8443}; pass an empty set to disable + # the port check so this test exercises the default-port logic only. + host, _ip, port = resolve_and_validate_host( + "http://example.com/jwks", + allowed_ports=frozenset(), + ) assert host == "example.com" assert port == 80 diff --git a/tests/conformance/signing/test_jwks.py b/tests/conformance/signing/test_jwks.py index bd0692c5..5beb057b 100644 --- a/tests/conformance/signing/test_jwks.py +++ b/tests/conformance/signing/test_jwks.py @@ -54,7 +54,14 @@ def test_ssrf_allow_private_override() -> None: "adcp.signing.jwks.socket.getaddrinfo", return_value=[(2, 1, 6, "", ("127.0.0.1", 0))], ): - validate_jwks_uri("http://localhost:8080/jwks.json", allow_private=True) + # Test names "allow_private" — the port allowlist is independent; + # use the test-only escape hatch (empty allowed_ports) so the IP-range + # override is what's actually being exercised. + validate_jwks_uri( + "http://localhost:8080/jwks.json", + allow_private=True, + allowed_ports=frozenset(), + ) def test_ssrf_metadata_ip_blocked_even_with_allow_private() -> None: @@ -99,6 +106,124 @@ def test_ssrf_caps_resolved_address_scan() -> None: validate_jwks_uri("https://example.com/jwks.json") +# ---- Port allowlist ---- +# +# Even when the resolved IP is public, buyers MUST NOT smuggle traffic to +# arbitrary ports — :25 (SMTP relay), :6379 (Redis), :11211 (Memcached), etc. +# The default allowlist permits {443, 8443} only. + + +@pytest.mark.parametrize( + "uri,port", + [ + ("https://example.com:25/jwks.json", 25), # SMTP + ("https://example.com:6379/jwks.json", 6379), # Redis + ("https://example.com:11211/jwks.json", 11211), # Memcached + ("https://example.com:8080/jwks.json", 8080), # generic HTTP-alt + ("http://example.com:80/jwks.json", 80), # plain HTTP — even on the canonical port + ], +) +def test_ssrf_rejects_disallowed_ports(uri: str, port: int) -> None: + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + with pytest.raises(SSRFValidationError, match=f"port {port} not allowed"): + validate_jwks_uri(uri) + + +@pytest.mark.parametrize( + "uri", + [ + "https://example.com/jwks.json", # implicit :443 + "https://example.com:443/jwks.json", + "https://example.com:8443/jwks.json", + ], +) +def test_ssrf_allows_default_ports(uri: str) -> None: + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri(uri) + + +def test_ssrf_allowed_ports_override() -> None: + """Adopters with trusted on-prem deployments can permit non-standard ports.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri( + "https://example.com:9000/jwks.json", + allowed_ports=frozenset({443, 9000}), + ) + + +def test_ssrf_allowed_ports_empty_disables_check() -> None: + """Empty frozenset is the test-only escape hatch (any port permitted).""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri( + "https://example.com:25/jwks.json", + allowed_ports=frozenset(), + ) + + +# ---- Tenant-scoped JWKS resolution ---- +# +# Multi-tenant deployments pass tenant_id so a resolver instance shared across +# tenants can refuse keys outside the active tenant's published JWKS. The +# in-tree resolvers (Static, Caching) are single-tenant by construction and +# accept the kwarg as a pass-through; tenant-scoping resolvers wrap them. + + +def test_static_resolver_accepts_tenant_id_kwarg() -> None: + """Existing single-tenant impls accept the tenant_id kwarg without + enforcing it — tenant scoping is a wrapper concern.""" + jwk = {"kid": "key-1", "kty": "OKP", "crv": "Ed25519", "x": "..."} + resolver = StaticJwksResolver({"keys": [jwk]}) + # Single-tenant: kwarg is ignored, lookup still succeeds. + assert resolver("key-1") == jwk + assert resolver("key-1", tenant_id="tenant-a") == jwk + assert resolver("key-1", tenant_id="tenant-b") == jwk + assert resolver("missing", tenant_id="tenant-a") is None + + +def test_tenant_scoping_wrapper_pattern() -> None: + """Reference pattern: a tenant-scoping wrapper composes single-tenant + resolvers and enforces ``(tenant_id, key_id)`` semantics by routing. + Adopters wire one of these in ``VerifyOptions.jwks_resolver`` for + multi-tenant deployments.""" + tenant_a = StaticJwksResolver( + {"keys": [{"kid": "key-a", "kty": "OKP", "crv": "Ed25519", "x": "a"}]} + ) + tenant_b = StaticJwksResolver( + {"keys": [{"kid": "key-b", "kty": "OKP", "crv": "Ed25519", "x": "b"}]} + ) + + def tenant_scoped(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: + if tenant_id == "tenant-a": + return tenant_a(keyid) + if tenant_id == "tenant-b": + return tenant_b(keyid) + return None # unknown tenant rejects all keys + + # Cross-tenant key confusion is closed: tenant A's key_id presented + # to tenant B's transport returns None, the verifier raises + # REQUEST_SIGNATURE_KEY_UNKNOWN, signature verification fails closed. + assert tenant_scoped("key-a", tenant_id="tenant-a") is not None + assert tenant_scoped("key-a", tenant_id="tenant-b") is None + assert tenant_scoped("key-b", tenant_id="tenant-b") is not None + assert tenant_scoped("key-b", tenant_id="tenant-a") is None + # Unknown tenant rejects everything. + assert tenant_scoped("key-a", tenant_id="unknown") is None + # No tenant_id → unknown. + assert tenant_scoped("key-a") is None + + # ---- CachingJwksResolver ---- diff --git a/tests/conformance/signing/test_verifier_defaults.py b/tests/conformance/signing/test_verifier_defaults.py index 7eee1505..b4d61c8c 100644 --- a/tests/conformance/signing/test_verifier_defaults.py +++ b/tests/conformance/signing/test_verifier_defaults.py @@ -71,3 +71,16 @@ def test_revocation_stays_optional() -> None: opts = _opts() assert opts.revocation_checker is None assert opts.revocation_list is None + + +def test_default_covers_content_digest_is_required() -> None: + """Body integrity must be authenticated end-to-end by default — + ``"either"`` or ``"forbidden"`` lets a MITM inside TLS termination + swap bodies on signed requests whose digest isn't covered. + + Operators who knowingly accept that tradeoff (e.g. a strict reverse- + proxy boundary that owns body integrity at a different layer) opt + out by constructing ``VerifierCapability(covers_content_digest=...)`` + explicitly. The default MUST be the secure choice.""" + cap = VerifierCapability() + assert cap.covers_content_digest == "required" diff --git a/tests/conformance/signing/test_verifier_vectors.py b/tests/conformance/signing/test_verifier_vectors.py index 1948ae1b..0922beac 100644 --- a/tests/conformance/signing/test_verifier_vectors.py +++ b/tests/conformance/signing/test_verifier_vectors.py @@ -37,7 +37,8 @@ def _build_jwks_resolver(vector: dict): else: entries = {kid: KEYS_BY_KID[kid] for kid in vector.get("jwks_ref", [])} - def resolve(keyid: str) -> dict | None: + def resolve(keyid: str, *, tenant_id: str | None = None) -> dict | None: + del tenant_id # vectors are single-tenant return entries.get(keyid) return resolve diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index fd90baa2..e3d3e4ce 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -397,3 +397,89 @@ async def test_receiver_rejects_when_sender_key_wrong() -> None: ) assert not result.ok assert result.status_code == 401 + + +@pytest.mark.asyncio +async def test_owned_client_rejects_loopback_destination() -> None: + """Default sender (no operator client) must reject loopback URLs via SSRF + guard. Without pin-and-bind, a buyer-supplied webhook URL pointing at + 127.0.0.1 lets a public agent reach in-cluster services.""" + from adcp.signing import SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + async with sender: + with pytest.raises(SSRFValidationError): + await sender.send_mcp( + url="https://127.0.0.1/webhooks/adcp", + task_id="task_ssrf", + task_type="create_media_buy", + status="completed", + ) + + +@pytest.mark.asyncio +async def test_owned_client_rejects_disallowed_port() -> None: + """Default sender rejects ports outside the {443, 8443} allowlist — + a buyer can't smuggle traffic to internal Redis / SMTP / Memcached + on a public IP via a webhook URL on a non-standard port.""" + from unittest.mock import patch + + from adcp.signing import SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + # Mock DNS so we don't hit a live host; the port check fires before + # resolution even runs (in resolve_and_validate_host). + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + async with sender: + with pytest.raises(SSRFValidationError, match="port 6379 not allowed"): + await sender.send_mcp( + url="https://example.com:6379/webhooks/adcp", + task_id="task_port", + task_type="create_media_buy", + status="completed", + ) + + +@pytest.mark.asyncio +async def test_send_mcp_threads_operation_id_into_payload() -> None: + """``operation_id`` is buyer-supplied and embedded by the buyer in the + webhook URL when registering pushNotificationConfig. Per the schema at + ``mcp-webhook-payload.json``, publishers MUST echo it in the payload so + buyers correlate notifications without parsing URL paths. + + Regression guard for the docstring-vs-schema mismatch noted in the + DecisioningPlatform foundations audit (the prior docstring discouraged + populating the field, contradicting the schema's MUST).""" + captured_payloads: list[dict[str, Any]] = [] + app = FastAPI() + + @app.post("/webhooks/adcp/{op_id}") + async def echo(request: Request, op_id: str) -> JSONResponse: + body = await request.body() + captured_payloads.append(json.loads(body)) + return JSONResponse({"ok": True}, status_code=200) + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + client=client, + ) + result = await sender.send_mcp( + url="http://test/webhooks/adcp/op_abc123", + task_id="task_op_id_test", + task_type="create_media_buy", + status="completed", + operation_id="op_abc123", + result={"media_buy_id": "mb_1"}, + ) + + assert result.ok + assert captured_payloads[0]["operation_id"] == "op_abc123" From 3fd2c49338fa7a55375971eef951c848fae03da6 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 29 Apr 2026 12:13:18 -0400 Subject: [PATCH 2/4] fix(signing): scope down per expert review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expert review of the prep PR (security-reviewer, code-reviewer, python-expert, ad-tech-protocol-expert) flagged 3 changes that should move to the foundation PR or be reversed: 1. JWKS tenant_id kwarg removed - Protocol expert: tenant_id is the wrong axis for JWKS isolation; AdCP anchors keys at agents[].jwks_uri, not at a seller-internal tenant_id. CachingJwksResolver(jwks_uri=...) already isolates by URL. - Security: the kwarg was opt-in with no enforcement signal — adopters who built a tenant-scoping resolver and forgot to thread tenant_id on every call would silently fall back to single-tenant. - Python: the JWKS document verifier (jws.py) didn't thread the kwarg, so the multi-tenant guarantee only held on the RFC 9421 path — undercutting the stated security improvement. - Code review: Protocol shape change with no compat shim was a breaking change for external resolver implementations. - Resolution: drop the kwarg from this prep PR. Reintroduce in the foundation PR with the spec-correct axis (likely (jwks_uri, key_id) or (agent_url, key_id)) and after the spec project clarifies multi-tenant key isolation guidance. 2. Port allowlist default flipped to permissive - Protocol expert: AdCP doesn't constrain pushNotificationConfig.url ports (push-notification-config.json:7-11). Defaulting to {443, 8443} silently rejects legitimate buyers on :9443 (Tomcat), :4443 (Spring Boot), or path-routed multi-tenant gateways. - Security M2: implicit-HTTP rejection (port 80) wasn't documented as scheme enforcement; adopters hitting it would widen the allowlist and re-enable plaintext. - Resolution: default allowed_ports=None (no port filter); operators opt INTO {443, 8443} hardening by passing allowed_ports=DEFAULT_ALLOWED_PORTS. The constant is exported from adcp.signing for adopters who want the recommended posture. The IP-range check + IP pinning still apply regardless of port — the smuggle vector to internal services on the same routable IP is closed by IP-range rejection, not by port enforcement. 3. covers_content_digest='required' regression test dropped - Protocol expert: AdCP 3.0 spec explicitly sets "default": "either" (get-adcp-capabilities-response.json:912-921) with the rationale "'required' is recommended for spend-committing operations in production; 4.0 recommends 'required' for those operations." - Existing code shipped "required" as the default before this PR — a pre-existing divergence from the spec that's not my PR's bug to pin. Drop the regression test that locked in the wrong default. File a separate issue to address the spec divergence. Also fixed (real bugs from the same review): 4. WebhookSender.from_jwk / from_pem now forward allow_private_destinations + allowed_destination_ports to the constructor. Documented happy-path adopters can now configure SSRF policy without dropping to __init__. 5. Replaced `assert self._client is not None` (mypy-narrowing) on the operator-supplied client path with an explicit RuntimeError. The state is reachable (aclose() then re-send) and python -O strips asserts, leaving the call to silently NoneType.post(). Net result for this PR (now 3 fixes instead of 5): - IP-pinned webhook delivery (the actual security hole) - Optional port allowlist as opt-in operator hardening - operation_id docstring fix (schema-mandated echo) Plus the from_jwk/from_pem ergonomics + assert-to-raise fix. Tests: 2254 passing locally. New tests: - test_jwks.py::test_ssrf_default_imposes_no_port_filter (parametrized) — confirms :9443/:4443/:8080/:80 all pass without explicit allowlist - test_jwks.py::test_ssrf_default_allowlist_passes_canonical_https_ports - test_jwks.py::test_ssrf_empty_allowlist_rejects_every_port (sentinel) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/signing/__init__.py | 2 + src/adcp/signing/jwks.py | 72 ++++------ src/adcp/signing/verifier.py | 7 +- src/adcp/signing/webhook_verifier.py | 5 - src/adcp/webhook_sender.py | 23 +++- .../signing/test_async_revocation.py | 3 +- .../signing/test_ip_pinned_transport.py | 9 +- tests/conformance/signing/test_jwks.py | 125 +++++++----------- .../signing/test_verifier_defaults.py | 13 -- .../signing/test_verifier_vectors.py | 3 +- .../signing/test_webhook_sender_e2e.py | 14 +- 11 files changed, 113 insertions(+), 163 deletions(-) diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 6761cf0e..4d85fe39 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -157,6 +157,7 @@ build_ip_pinned_transport, ) from adcp.signing.jwks import ( + DEFAULT_ALLOWED_PORTS, AsyncCachingJwksResolver, AsyncJwksFetcher, AsyncJwksResolver, @@ -251,6 +252,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "CachingJwksResolver", "CachingRevocationChecker", "CapabilityProvider", + "DEFAULT_ALLOWED_PORTS", "DEFAULT_EXPIRES_IN_SECONDS", "DEFAULT_GRACE_MULTIPLIER", "DEFAULT_SKEW_SECONDS", diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index bfd98223..d4fa75b4 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -53,12 +53,15 @@ } ) -# Allowed destination ports for SSRF-validated outbound HTTP. Buyers supplying -# webhook URLs on non-standard ports can otherwise smuggle traffic to internal -# services bound to the same routable IP — :25 (SMTP relay), :6379 (Redis), -# :11211 (Memcached), etc. The validator rejects these even when the IP itself -# is public. Tests override via the ``allowed_ports`` parameter on -# :func:`resolve_and_validate_host`. +# Recommended destination ports for hardened SSRF-validated outbound HTTP +# deployments. AdCP itself does not constrain ``pushNotificationConfig.url`` +# ports (see ``schemas/cache/core/push-notification-config.json``), so the +# default port-allowlist is permissive — adopters who want a hardening posture +# pass ``allowed_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set) explicitly. +# Rejecting non-standard ports closes a smuggle vector for buyers bouncing +# traffic to internal services on the same routable IP — :25 (SMTP relay), +# :6379 (Redis), :11211 (Memcached), etc. — but that's an operator choice, +# not a framework default that breaks legitimate :9443 / :4443 buyers. DEFAULT_ALLOWED_PORTS: frozenset[int] = frozenset({443, 8443}) # Upper bound on the number of resolved addresses examined per validation call. @@ -92,17 +95,9 @@ class JwksResolver(Protocol): :class:`CachingJwksResolver` (fetches + caches from a URI). Async callers use :class:`AsyncJwksResolver` instead. - - ``tenant_id`` is optional; multi-tenant deployments pass it so a - resolver instance shared across tenants can refuse keys outside the - active tenant's published JWKS. Single-tenant deployments leave it - unset; resolvers that don't enforce tenant scoping ignore it. - Cross-tenant key confusion (a buyer signing for tenant B who knows - tenant A's ``key_id`` presenting to A's transport) is closed at the - resolver layer, not the verifier. """ - def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: ... + def __call__(self, keyid: str) -> dict[str, Any] | None: ... class AsyncJwksResolver(Protocol): @@ -115,13 +110,9 @@ class AsyncJwksResolver(Protocol): it in a thin async callable — there's no async work, just a dict lookup — but typically you'll just use the static one directly where an :class:`AsyncJwksResolver` is expected via :func:`as_async`. - - ``tenant_id`` semantics match :class:`JwksResolver`. """ - async def __call__( - self, keyid: str, *, tenant_id: str | None = None - ) -> dict[str, Any] | None: ... + async def __call__(self, keyid: str) -> dict[str, Any] | None: ... def validate_jwks_uri( @@ -165,11 +156,12 @@ def resolve_and_validate_host( Skip the reserved-range check. For tests only; cloud-metadata IPs remain blocked unconditionally. allowed_ports: - Override the default destination port allowlist. Pass an empty - frozenset to disable the check entirely (tests only); pass a - custom set to permit non-standard ports for trusted on-prem - deployments. ``None`` (default) uses :data:`DEFAULT_ALLOWED_PORTS` - (`{443, 8443}`). + Optional destination-port allowlist. ``None`` (default) imposes + no port filter — the URL's port is unrestricted. Hardened + deployments pass :data:`DEFAULT_ALLOWED_PORTS` (`{443, 8443}`) + or a custom set; the validator then rejects URIs whose port + is outside the set. AdCP doesn't constrain webhook ports in + the spec, so this is operator policy, not a framework default. Returns ------- @@ -180,8 +172,9 @@ def resolve_and_validate_host( Raises ------ SSRFValidationError - Scheme is not ``http``/``https``, port is not in the allowlist, - the hostname doesn't resolve, or every resolved IP is in a blocked range. + Scheme is not ``http``/``https``, ``allowed_ports`` is set and + the URI's port is outside it, the hostname doesn't resolve, or + every resolved IP is in a blocked range. """ parts = urlsplit(uri) if parts.scheme not in ("http", "https"): @@ -209,11 +202,10 @@ def resolve_and_validate_host( except (UnicodeError, UnicodeEncodeError) as exc: raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80) - effective_allowed_ports = DEFAULT_ALLOWED_PORTS if allowed_ports is None else allowed_ports - if effective_allowed_ports and port not in effective_allowed_ports: + if allowed_ports is not None and port not in allowed_ports: raise SSRFValidationError( f"port {port} not allowed for SSRF-validated fetch " - f"(allowed: {sorted(effective_allowed_ports)})" + f"(allowed: {sorted(allowed_ports) if allowed_ports else ''})" ) try: @@ -329,13 +321,7 @@ def __init__( self._last_attempt: float | None = None self._primed = False - def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - # tenant_id ignored — this resolver is bound to a single jwks_uri - # at construction. Multi-tenant deployments wire one resolver per - # tenant and route by tenant_id externally; a tenant-scoping - # resolver wrapper that checks tenant_id against a registry can - # compose this resolver internally. - del tenant_id + def __call__(self, keyid: str) -> dict[str, Any] | None: if keyid in self._cache: return self._cache[keyid] now = self._clock() @@ -371,9 +357,7 @@ class StaticJwksResolver: def __init__(self, jwks: dict[str, Any]) -> None: self._keys = {jwk["kid"]: jwk for jwk in jwks.get("keys", []) if "kid" in jwk} - def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - # tenant_id ignored — single-tenant by construction. - del tenant_id + def __call__(self, keyid: str) -> dict[str, Any] | None: return self._keys.get(keyid) @@ -448,9 +432,7 @@ def __init__( # across ``asyncio.run`` boundaries. self._lock: asyncio.Lock = asyncio.Lock() - async def __call__(self, keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - # tenant_id ignored — see CachingJwksResolver.__call__. - del tenant_id + async def __call__(self, keyid: str) -> dict[str, Any] | None: if keyid in self._cache: return self._cache[keyid] now = self._clock() @@ -497,8 +479,8 @@ def as_async_resolver(resolver: JwksResolver) -> AsyncJwksResolver: work (just a dict lookup); the wrapper is a shape adapter. """ - async def resolve(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - return resolver(keyid, tenant_id=tenant_id) + async def resolve(keyid: str) -> dict[str, Any] | None: + return resolver(keyid) return resolve diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index f354ce1b..b0a0bac7 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -131,11 +131,6 @@ class VerifyOptions: expected_adcp_use: str = ADCP_USE_REQUEST allowed_algs: frozenset[str] = ALLOWED_ALGS agent_url: str | None = None - # Multi-tenant deployments pass tenant_id so a JWKS resolver shared - # across tenants refuses keys outside the active tenant's published - # JWKS. Single-tenant deployments leave it None; resolvers that - # don't enforce tenant scoping ignore it. See JwksResolver Protocol. - tenant_id: str | None = None def verify_request_signature( @@ -224,7 +219,7 @@ def verify_request_signature( message=f"nonce exceeds {_MAX_PARAM_LEN} bytes", ) - jwk = options.jwks_resolver(keyid, tenant_id=options.tenant_id) + jwk = options.jwks_resolver(keyid) if jwk is None: raise SignatureVerificationError( REQUEST_SIGNATURE_KEY_UNKNOWN, diff --git a/src/adcp/signing/webhook_verifier.py b/src/adcp/signing/webhook_verifier.py index b3ea6c7b..75fcc833 100644 --- a/src/adcp/signing/webhook_verifier.py +++ b/src/adcp/signing/webhook_verifier.py @@ -84,10 +84,6 @@ class WebhookVerifyOptions: allowed_algs: frozenset[str] = ALLOWED_ALGS sender_url: str | None = None clock: Callable[[], float] = time.time - # Multi-tenant deployments pass tenant_id so a JWKS resolver shared - # across tenants refuses keys outside the active tenant's published - # JWKS — same semantics as VerifyOptions.tenant_id. - tenant_id: str | None = None @dataclass(frozen=True) @@ -156,7 +152,6 @@ def verify_webhook_signature( expected_adcp_use=ADCP_USE_WEBHOOK, allowed_algs=options.allowed_algs, agent_url=options.sender_url, - tenant_id=options.tenant_id, ) try: diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index c9505285..a5a8a38b 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -133,6 +133,8 @@ def from_jwk( d_field: str = "d", client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> WebhookSender: """Construct from a JWK that includes the private scalar. @@ -140,6 +142,9 @@ def from_jwk( doesn't validate this (you're signing with your own key; validation happens at the receiver), but a key whose adcp_use is wrong will be rejected by every conformant verifier. + + ``allow_private_destinations`` and ``allowed_destination_ports`` + forward to :meth:`__init__` — see that signature for semantics. """ # Snapshot the mapping once — a live Mapping could otherwise return # different values across the adcp_use / kid / d / alg reads. @@ -168,6 +173,8 @@ def from_jwk( alg=alg, client=client, timeout_seconds=timeout_seconds, + allow_private_destinations=allow_private_destinations, + allowed_destination_ports=allowed_destination_ports, ) @classmethod @@ -180,6 +187,8 @@ def from_pem( passphrase: bytes | None = None, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> WebhookSender: """Load a private key from a PEM file and bind it as a webhook sender. @@ -199,6 +208,8 @@ def from_pem( client: Optional pre-built :class:`httpx.AsyncClient` to share across the SDK; the sender owns its own client when omitted. timeout_seconds: Per-request timeout for the owned client. + allow_private_destinations: Forwarded to :meth:`__init__`. + allowed_destination_ports: Forwarded to :meth:`__init__`. Raises: ValueError: ``alg`` is not ed25519 / es256, or the PEM contains @@ -244,6 +255,8 @@ def from_pem( alg=alg, client=client, timeout_seconds=timeout_seconds, + allow_private_destinations=allow_private_destinations, + allowed_destination_ports=allowed_destination_ports, ) def __repr__(self) -> str: @@ -534,8 +547,14 @@ async def _send_bytes( response = await client.post(url, content=body, headers=headers) else: # Operator-supplied client — they own the SSRF guarantees on - # their transport (proxy allowlist, mTLS, etc.). - assert self._client is not None # narrowing for mypy + # their transport (proxy allowlist, mTLS, etc.). Reachable as + # None after aclose(); a runtime check beats an assert that + # python -O strips to silently NoneType.post(). + if self._client is None: + raise RuntimeError( + "WebhookSender's operator-supplied client was already " + "closed. Construct a new sender or pass a fresh client." + ) response = await self._client.post(url, content=body, headers=headers) return WebhookDeliveryResult( diff --git a/tests/conformance/signing/test_async_revocation.py b/tests/conformance/signing/test_async_revocation.py index 3c85b222..f3bc01de 100644 --- a/tests/conformance/signing/test_async_revocation.py +++ b/tests/conformance/signing/test_async_revocation.py @@ -183,8 +183,7 @@ async def test_averify_jws_rejects_wrong_typ() -> None: async def test_as_async_resolver_wraps_sync_resolver() -> None: - def sync_resolver(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - del tenant_id + def sync_resolver(keyid: str) -> dict[str, Any] | None: return {"kid": keyid} if keyid == "x" else None async_resolver: AsyncJwksResolver = as_async_resolver(sync_resolver) diff --git a/tests/conformance/signing/test_ip_pinned_transport.py b/tests/conformance/signing/test_ip_pinned_transport.py index 93d5c43f..b9292b8e 100644 --- a/tests/conformance/signing/test_ip_pinned_transport.py +++ b/tests/conformance/signing/test_ip_pinned_transport.py @@ -95,13 +95,8 @@ def test_resolve_returns_tuple_of_host_ip_port() -> None: def test_resolve_defaults_http_port_80() -> None: # Even though we normally refuse non-https elsewhere, the helper - # itself is scheme-agnostic for the port default. Port 80 is outside - # the production allowlist {443, 8443}; pass an empty set to disable - # the port check so this test exercises the default-port logic only. - host, _ip, port = resolve_and_validate_host( - "http://example.com/jwks", - allowed_ports=frozenset(), - ) + # itself is scheme-agnostic for the port default. + host, _ip, port = resolve_and_validate_host("http://example.com/jwks") assert host == "example.com" assert port == 80 diff --git a/tests/conformance/signing/test_jwks.py b/tests/conformance/signing/test_jwks.py index 5beb057b..af156fd6 100644 --- a/tests/conformance/signing/test_jwks.py +++ b/tests/conformance/signing/test_jwks.py @@ -8,6 +8,7 @@ import pytest from adcp.signing import ( + DEFAULT_ALLOWED_PORTS, CachingJwksResolver, SignatureVerificationError, SSRFValidationError, @@ -54,14 +55,7 @@ def test_ssrf_allow_private_override() -> None: "adcp.signing.jwks.socket.getaddrinfo", return_value=[(2, 1, 6, "", ("127.0.0.1", 0))], ): - # Test names "allow_private" — the port allowlist is independent; - # use the test-only escape hatch (empty allowed_ports) so the IP-range - # override is what's actually being exercised. - validate_jwks_uri( - "http://localhost:8080/jwks.json", - allow_private=True, - allowed_ports=frozenset(), - ) + validate_jwks_uri("http://localhost:8080/jwks.json", allow_private=True) def test_ssrf_metadata_ip_blocked_even_with_allow_private() -> None: @@ -106,11 +100,34 @@ def test_ssrf_caps_resolved_address_scan() -> None: validate_jwks_uri("https://example.com/jwks.json") -# ---- Port allowlist ---- +# ---- Port allowlist (opt-in operator hardening) ---- # -# Even when the resolved IP is public, buyers MUST NOT smuggle traffic to -# arbitrary ports — :25 (SMTP relay), :6379 (Redis), :11211 (Memcached), etc. -# The default allowlist permits {443, 8443} only. +# AdCP doesn't constrain webhook ports in the spec, so the default validator +# imposes no port filter. Adopters who want a hardening posture pass +# ``allowed_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set); rejection of +# non-standard ports closes a smuggle vector for buyers bouncing traffic to +# internal services on the same routable IP. + + +@pytest.mark.parametrize( + "uri", + [ + "https://example.com:9443/jwks.json", # Tomcat default — legitimate + "https://example.com:4443/jwks.json", # Spring Boot default — legitimate + "https://example.com:8080/jwks.json", # buyer's path-routed gateway + "http://example.com:80/jwks.json", # plain HTTP — scheme check is separate + ], +) +def test_ssrf_default_imposes_no_port_filter(uri: str) -> None: + """Without explicit ``allowed_ports``, any port that satisfies the + scheme check passes. AdCP doesn't restrict ``pushNotificationConfig.url`` + to standard ports — :8443/:9443/:4443 are all legitimate buyer + deployments.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri(uri) @pytest.mark.parametrize( @@ -120,16 +137,18 @@ def test_ssrf_caps_resolved_address_scan() -> None: ("https://example.com:6379/jwks.json", 6379), # Redis ("https://example.com:11211/jwks.json", 11211), # Memcached ("https://example.com:8080/jwks.json", 8080), # generic HTTP-alt - ("http://example.com:80/jwks.json", 80), # plain HTTP — even on the canonical port ], ) -def test_ssrf_rejects_disallowed_ports(uri: str, port: int) -> None: +def test_ssrf_rejects_disallowed_ports_when_hardening(uri: str, port: int) -> None: + """Operators opt into the hardening posture by passing + ``DEFAULT_ALLOWED_PORTS`` or a custom set; non-allowlisted ports + then reject.""" with patch( "adcp.signing.jwks.socket.getaddrinfo", return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], ): with pytest.raises(SSRFValidationError, match=f"port {port} not allowed"): - validate_jwks_uri(uri) + validate_jwks_uri(uri, allowed_ports=DEFAULT_ALLOWED_PORTS) @pytest.mark.parametrize( @@ -140,15 +159,17 @@ def test_ssrf_rejects_disallowed_ports(uri: str, port: int) -> None: "https://example.com:8443/jwks.json", ], ) -def test_ssrf_allows_default_ports(uri: str) -> None: +def test_ssrf_default_allowlist_passes_canonical_https_ports(uri: str) -> None: + """``DEFAULT_ALLOWED_PORTS = {443, 8443}`` is the recommended hardening + set; both canonical-https and HTTPS-alt pass.""" with patch( "adcp.signing.jwks.socket.getaddrinfo", return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], ): - validate_jwks_uri(uri) + validate_jwks_uri(uri, allowed_ports=DEFAULT_ALLOWED_PORTS) -def test_ssrf_allowed_ports_override() -> None: +def test_ssrf_allowed_ports_custom_set() -> None: """Adopters with trusted on-prem deployments can permit non-standard ports.""" with patch( "adcp.signing.jwks.socket.getaddrinfo", @@ -160,68 +181,20 @@ def test_ssrf_allowed_ports_override() -> None: ) -def test_ssrf_allowed_ports_empty_disables_check() -> None: - """Empty frozenset is the test-only escape hatch (any port permitted).""" +def test_ssrf_empty_allowlist_rejects_every_port() -> None: + """``allowed_ports=frozenset()`` is meaningful: no port satisfies the + set. Distinct from ``allowed_ports=None`` (no filter at all). Used + by deployments that want to fail closed unless a port is explicitly + permitted.""" with patch( "adcp.signing.jwks.socket.getaddrinfo", return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], ): - validate_jwks_uri( - "https://example.com:25/jwks.json", - allowed_ports=frozenset(), - ) - - -# ---- Tenant-scoped JWKS resolution ---- -# -# Multi-tenant deployments pass tenant_id so a resolver instance shared across -# tenants can refuse keys outside the active tenant's published JWKS. The -# in-tree resolvers (Static, Caching) are single-tenant by construction and -# accept the kwarg as a pass-through; tenant-scoping resolvers wrap them. - - -def test_static_resolver_accepts_tenant_id_kwarg() -> None: - """Existing single-tenant impls accept the tenant_id kwarg without - enforcing it — tenant scoping is a wrapper concern.""" - jwk = {"kid": "key-1", "kty": "OKP", "crv": "Ed25519", "x": "..."} - resolver = StaticJwksResolver({"keys": [jwk]}) - # Single-tenant: kwarg is ignored, lookup still succeeds. - assert resolver("key-1") == jwk - assert resolver("key-1", tenant_id="tenant-a") == jwk - assert resolver("key-1", tenant_id="tenant-b") == jwk - assert resolver("missing", tenant_id="tenant-a") is None - - -def test_tenant_scoping_wrapper_pattern() -> None: - """Reference pattern: a tenant-scoping wrapper composes single-tenant - resolvers and enforces ``(tenant_id, key_id)`` semantics by routing. - Adopters wire one of these in ``VerifyOptions.jwks_resolver`` for - multi-tenant deployments.""" - tenant_a = StaticJwksResolver( - {"keys": [{"kid": "key-a", "kty": "OKP", "crv": "Ed25519", "x": "a"}]} - ) - tenant_b = StaticJwksResolver( - {"keys": [{"kid": "key-b", "kty": "OKP", "crv": "Ed25519", "x": "b"}]} - ) - - def tenant_scoped(keyid: str, *, tenant_id: str | None = None) -> dict[str, Any] | None: - if tenant_id == "tenant-a": - return tenant_a(keyid) - if tenant_id == "tenant-b": - return tenant_b(keyid) - return None # unknown tenant rejects all keys - - # Cross-tenant key confusion is closed: tenant A's key_id presented - # to tenant B's transport returns None, the verifier raises - # REQUEST_SIGNATURE_KEY_UNKNOWN, signature verification fails closed. - assert tenant_scoped("key-a", tenant_id="tenant-a") is not None - assert tenant_scoped("key-a", tenant_id="tenant-b") is None - assert tenant_scoped("key-b", tenant_id="tenant-b") is not None - assert tenant_scoped("key-b", tenant_id="tenant-a") is None - # Unknown tenant rejects everything. - assert tenant_scoped("key-a", tenant_id="unknown") is None - # No tenant_id → unknown. - assert tenant_scoped("key-a") is None + with pytest.raises(SSRFValidationError, match="port 443 not allowed"): + validate_jwks_uri( + "https://example.com/jwks.json", + allowed_ports=frozenset(), + ) # ---- CachingJwksResolver ---- diff --git a/tests/conformance/signing/test_verifier_defaults.py b/tests/conformance/signing/test_verifier_defaults.py index b4d61c8c..7eee1505 100644 --- a/tests/conformance/signing/test_verifier_defaults.py +++ b/tests/conformance/signing/test_verifier_defaults.py @@ -71,16 +71,3 @@ def test_revocation_stays_optional() -> None: opts = _opts() assert opts.revocation_checker is None assert opts.revocation_list is None - - -def test_default_covers_content_digest_is_required() -> None: - """Body integrity must be authenticated end-to-end by default — - ``"either"`` or ``"forbidden"`` lets a MITM inside TLS termination - swap bodies on signed requests whose digest isn't covered. - - Operators who knowingly accept that tradeoff (e.g. a strict reverse- - proxy boundary that owns body integrity at a different layer) opt - out by constructing ``VerifierCapability(covers_content_digest=...)`` - explicitly. The default MUST be the secure choice.""" - cap = VerifierCapability() - assert cap.covers_content_digest == "required" diff --git a/tests/conformance/signing/test_verifier_vectors.py b/tests/conformance/signing/test_verifier_vectors.py index 0922beac..1948ae1b 100644 --- a/tests/conformance/signing/test_verifier_vectors.py +++ b/tests/conformance/signing/test_verifier_vectors.py @@ -37,8 +37,7 @@ def _build_jwks_resolver(vector: dict): else: entries = {kid: KEYS_BY_KID[kid] for kid in vector.get("jwks_ref", [])} - def resolve(keyid: str, *, tenant_id: str | None = None) -> dict | None: - del tenant_id # vectors are single-tenant + def resolve(keyid: str) -> dict | None: return entries.get(keyid) return resolve diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index e3d3e4ce..00db7e3f 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -420,16 +420,20 @@ async def test_owned_client_rejects_loopback_destination() -> None: @pytest.mark.asyncio -async def test_owned_client_rejects_disallowed_port() -> None: - """Default sender rejects ports outside the {443, 8443} allowlist — - a buyer can't smuggle traffic to internal Redis / SMTP / Memcached - on a public IP via a webhook URL on a non-standard port.""" +async def test_owned_client_rejects_disallowed_port_when_hardening_configured() -> None: + """Operators opt in to the port-allowlist hardening posture by passing + ``allowed_destination_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set). + The sender then rejects buyer URLs on ports outside the set, closing + the smuggle vector to internal Redis/SMTP/Memcached on the same + routable IP. Without the kwarg, AdCP-spec-compliant ports like + :9443/:4443 still pass.""" from unittest.mock import patch - from adcp.signing import SSRFValidationError + from adcp.signing import DEFAULT_ALLOWED_PORTS, SSRFValidationError sender = WebhookSender.from_jwk( {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + allowed_destination_ports=DEFAULT_ALLOWED_PORTS, ) # Mock DNS so we don't hit a live host; the port check fires before # resolution even runs (in resolve_and_validate_host). From 50ae54d9e7c48d9e9feba058400517fc39c39eec Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 29 Apr 2026 12:47:18 -0400 Subject: [PATCH 3/4] =?UTF-8?q?feat(signing):=20apply=202nd-pass=20review?= =?UTF-8?q?=20=E2=80=94=20trust=5Fenv,=20validate-before-sign,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second-pass expert review (security-reviewer, code-reviewer, python-expert, ad-tech-protocol-expert) on the scope-down commit 3fd2c493 surfaced 1 ship-blocker, 2 real bugs, 3 missing tests, and a nit. All addressed. Ship-blocker (security-reviewer): 1. WebhookSender's per-request httpx.AsyncClient missed trust_env=False. httpx defaults trust_env=True, which routes the signed webhook through any HTTPS_PROXY / HTTP_PROXY env var, bypassing the AsyncIpPinnedTransport entirely. Every other pinned-transport callsite in this codebase explicitly sets trust_env=False (default_jwks_fetcher, async_default_jwks_fetcher, revocation_fetcher); the webhook sender was the outlier. An attacker who controls process env (sidecar config, dotenv, malicious cluster egress policy) could otherwise pivot to receiving the signed webhook body. One-line fix at webhook_sender.py:577 with a regression test that asserts the kwarg is set on the per-request client. Bugs (python-expert): 2. Stale docstrings in build_ip_pinned_transport and build_async_ip_pinned_transport claimed allowed_ports defaults to DEFAULT_ALLOWED_PORTS ({443, 8443}) — but the scope-down flipped the default to None (no port filter). Adopters reading the docstring would hit confusing rejections. Updated both to describe the actual behavior. 3. _send_bytes signed the body before SSRF-validating the URL. Restructured so the pinned-transport build (which runs SSRF + port validation) happens first; signing only after validation succeeds. Hostile URLs no longer leave a signed payload in process memory for faulthandler / custom logging hooks to capture on exception. New regression tests (code-reviewer + security-reviewer): 4. test_owned_client_default_allows_non_standard_ports — sender-level positive analog of the validator-level test_ssrf_default_imposes_no_port_filter. Confirms the permissive port default reaches the actual delivery path; AdCP-spec-compliant buyers on :9443 (Tomcat) and similar non-standard ports succeed without explicit allowlist. 5. test_operator_supplied_client_bypasses_ssrf_guard — named regression guard for the documented contract. Without this, a future refactor that mistakenly applies pin-and-bind to both branches would break ASGI-based unit tests and any vetted-egress-proxy deployment that routes via private networks. 6. test_owned_client_ignores_https_proxy_env — regression guard for trust_env=False. Patches HTTPS_PROXY in env, asserts the per-request client constructs with trust_env=False so the proxy is ignored. Code-reviewer nit: 7. Deduplicated DEFAULT_ALLOWED_PORTS rationale block-comment between adcp.signing.jwks (constant definition) and tests/conformance/signing/test_jwks.py. Kept at the constant-definition site; test file points to it. Commit type changed from fix(signing) to feat(signing): The PR adds public surface (DEFAULT_ALLOWED_PORTS export, new kwargs on validate_jwks_uri / resolve_and_validate_host / build_*_pinned_transport / WebhookSender / from_jwk / from_pem) and changes WebhookSender._send_bytes behavior on the owned-client path (now SSRF-validates and pin-binds every delivery). Per semver, additive public-API surface = minor; the security-fix-via-strictening-default is also conventionally a minor bump. release-please should tag this as 4.1.0, not 4.0.1. If squash-merging, the maintainer should use a feat(signing): PR title so the squash subject carries the conventional-commit type that release-please reads. Tests: 2257 passing locally (3 new). Pre-commit clean (black, ruff, mypy, bandit). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/signing/ip_pinned_transport.py | 16 +- src/adcp/webhook_sender.py | 47 +++-- tests/conformance/signing/test_jwks.py | 7 +- .../signing/test_webhook_sender_e2e.py | 160 ++++++++++++++++++ 4 files changed, 205 insertions(+), 25 deletions(-) diff --git a/src/adcp/signing/ip_pinned_transport.py b/src/adcp/signing/ip_pinned_transport.py index b34172da..92e3b830 100644 --- a/src/adcp/signing/ip_pinned_transport.py +++ b/src/adcp/signing/ip_pinned_transport.py @@ -295,11 +295,14 @@ def build_ip_pinned_transport( """Resolve ``uri`` once and return a transport pinned to the validated IP. Raises :class:`SSRFValidationError` if the URI's scheme isn't - ``http``/``https``, the port is not in the allowlist, the host - doesn't resolve, or every resolved IP is in a blocked range. + ``http``/``https``, ``allowed_ports`` is set and the URI's port is + outside it, the host doesn't resolve, or every resolved IP is in a + blocked range. - ``allowed_ports`` defaults to - :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`). + ``allowed_ports`` defaults to ``None`` (no port filter — AdCP + doesn't constrain webhook ports). Hardened deployments pass + :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`) + or a custom set. Typical use inside a fetcher:: @@ -328,8 +331,9 @@ def build_async_ip_pinned_transport( function itself is not awaitable. The returned transport plugs into :class:`httpx.AsyncClient`. - ``allowed_ports`` defaults to - :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`). + ``allowed_ports`` defaults to ``None`` (no port filter); see + :func:`build_ip_pinned_transport` for the hardening kwarg + semantics. """ hostname, resolved_ip, _port = resolve_and_validate_host( uri, diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index a5a8a38b..0d054a18 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -44,7 +44,10 @@ load_private_key_pem, private_key_from_jwk, ) -from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport +from adcp.signing.ip_pinned_transport import ( + AsyncIpPinnedTransport, + build_async_ip_pinned_transport, +) from adcp.signing.webhook_signer import sign_webhook from adcp.types import GeneratedTaskStatus from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData @@ -505,7 +508,26 @@ async def _send_bytes( with mTLS to a known buyer set, or an ASGI transport for testing), the sender trusts the operator's transport completely. Pin-and-bind is skipped; the operator's transport owns SSRF. + + On the owned-client path, SSRF validation runs **before** signing + so a hostile URL is rejected without first generating an + Ed25519/ES256 signature over the body. That signature would + otherwise sit in process memory until the SSRF rejection — + anything that snapshots locals on exception (faulthandler, + custom logging) could capture it. Validate first, sign second. """ + # Build the pinned transport up-front for the owned-client path. + # This runs SSRF + port validation against the URL before any + # signing happens; a hostile URL raises SSRFValidationError here + # and the body never gets signed. + transport: AsyncIpPinnedTransport | None = None + if self._owns_client: + transport = build_async_ip_pinned_transport( + url, + allow_private=self._allow_private_destinations, + allowed_ports=self._allowed_destination_ports, + ) + base_headers = {"Content-Type": "application/json"} signed = sign_webhook( method="POST", @@ -529,27 +551,26 @@ async def _send_bytes( for k, v in extra_headers.items(): headers[k] = v - if self._owns_client: - # Per-request pinned transport. Building one client per delivery - # is the security-correct choice: keeping a pinned transport - # alive across deliveries to the same hostname would defeat the - # rebinding defense (the IP would be frozen at first delivery). - transport = build_async_ip_pinned_transport( - url, - allow_private=self._allow_private_destinations, - allowed_ports=self._allowed_destination_ports, - ) + if transport is not None: + # Owned-client path. ``trust_env=False`` prevents httpx from + # routing the request through ``HTTPS_PROXY`` / ``HTTP_PROXY`` + # env vars — every other pinned-transport callsite in the + # codebase sets this for the same reason (default_jwks_fetcher, + # async_default_jwks_fetcher, revocation_fetcher). Without it, + # an attacker who controls process env can route the signed + # webhook through their endpoint, defeating the IP pin entirely. async with httpx.AsyncClient( transport=transport, timeout=self._timeout, follow_redirects=False, + trust_env=False, ) as client: response = await client.post(url, content=body, headers=headers) else: # Operator-supplied client — they own the SSRF guarantees on # their transport (proxy allowlist, mTLS, etc.). Reachable as - # None after aclose(); a runtime check beats an assert that - # python -O strips to silently NoneType.post(). + # None after aclose(); explicit raise survives ``python -O`` + # which would strip an assert. if self._client is None: raise RuntimeError( "WebhookSender's operator-supplied client was already " diff --git a/tests/conformance/signing/test_jwks.py b/tests/conformance/signing/test_jwks.py index af156fd6..a893a490 100644 --- a/tests/conformance/signing/test_jwks.py +++ b/tests/conformance/signing/test_jwks.py @@ -101,12 +101,7 @@ def test_ssrf_caps_resolved_address_scan() -> None: # ---- Port allowlist (opt-in operator hardening) ---- -# -# AdCP doesn't constrain webhook ports in the spec, so the default validator -# imposes no port filter. Adopters who want a hardening posture pass -# ``allowed_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set); rejection of -# non-standard ports closes a smuggle vector for buyers bouncing traffic to -# internal services on the same routable IP. +# Rationale lives in adcp.signing.jwks.DEFAULT_ALLOWED_PORTS docstring. @pytest.mark.parametrize( diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index 00db7e3f..beda5211 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -487,3 +487,163 @@ async def echo(request: Request, op_id: str) -> JSONResponse: assert result.ok assert captured_payloads[0]["operation_id"] == "op_abc123" + + +@pytest.mark.asyncio +async def test_owned_client_default_allows_non_standard_ports() -> None: + """The default ``WebhookSender`` (no operator client, no + ``allowed_destination_ports``) accepts AdCP-spec-compliant buyers on + non-standard ports — :9443 (Tomcat default), :4443 (Spring Boot + default), path-routed multi-tenant gateways. The IP-range check still + applies; we just don't impose a port filter unless the operator + explicitly opts in. + + Sender-level positive analog of test_ssrf_default_imposes_no_port_filter + in test_jwks.py — confirms the permissive default reaches the actual + delivery path, not just the underlying validator.""" + from unittest.mock import patch + + captured: list[tuple[str, int]] = [] + app = FastAPI() + + @app.post("/webhooks/adcp") + async def echo(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}, status_code=200) + + asgi_transport = httpx.ASGITransport(app=app) + + # Stub the pinned-transport build so we don't open a real socket to + # a public IP; capture that the build was attempted for the + # non-standard port and route the actual POST through ASGI. + def fake_build(uri: str, **_kwargs: Any) -> Any: + from urllib.parse import urlparse + + parsed = urlparse(uri) + captured.append((parsed.hostname or "", parsed.port or 443)) + return asgi_transport + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + side_effect=fake_build, + ): + async with sender: + result = await sender.send_mcp( + url="http://test:9443/webhooks/adcp", + task_id="task_nonstd", + task_type="create_media_buy", + status="completed", + ) + + assert result.ok + assert captured == [("test", 9443)] + + +@pytest.mark.asyncio +async def test_operator_supplied_client_bypasses_ssrf_guard() -> None: + """When the operator passes their own httpx client (vetted egress + proxy, ASGI test transport, etc.), the framework trusts them + completely — pin-and-bind is skipped and the SSRF range check does + NOT fire. The operator owns SSRF on their transport. + + Named regression test for the documented contract; without this, a + future refactor that mistakenly applies pin-and-bind to both + branches breaks ASGI-based unit tests and any vetted-proxy + deployments that route via private networks.""" + app = FastAPI() + + @app.post("/webhooks/adcp") + async def echo(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}, status_code=200) + + transport = httpx.ASGITransport(app=app) + # base_url is loopback-equivalent. With the SSRF guard active this + # would raise SSRFValidationError; under the operator-trust contract + # it must succeed. + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + client=client, + ) + result = await sender.send_mcp( + url="http://test/webhooks/adcp", + task_id="task_op_trust", + task_type="create_media_buy", + status="completed", + ) + + assert result.ok + + +@pytest.mark.asyncio +async def test_owned_client_ignores_https_proxy_env() -> None: + """``HTTPS_PROXY`` / ``HTTP_PROXY`` env vars MUST NOT defeat the IP + pin on the owned-client path. httpx's default ``trust_env=True`` + routes requests through proxy env vars, which would bypass the + AsyncIpPinnedTransport's network_backend entirely — an attacker who + controls process env (sidecar config, dotenv, malicious cluster + egress policy) could otherwise pivot to receiving the signed + webhook body. + + The sender constructs its per-request ``httpx.AsyncClient`` with + ``trust_env=False`` to close this. Regression guard: if a future + refactor drops the kwarg, this test catches it by setting a proxy + env var that points at an unreachable address; the pinned transport + must still route via the resolved IP and reach the test ASGI app + (which we can't directly observe under a real socket, so we assert + the proxy var is ignored by checking the constructor config).""" + import os + from unittest.mock import MagicMock, patch + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + + captured_kwargs: dict[str, Any] = {} + + class _FakeAsyncClient: + def __init__(self, **kwargs: Any) -> None: + # Capture kwargs only from the per-request construction in + # _send_bytes; __aenter__'s eager _get_client() also flows + # through here but its kwargs don't affect the per-request + # delivery path. The last writer wins; the per-request call + # is the one we care about. + captured_kwargs.update(kwargs) + self._response = MagicMock(status_code=200, headers={}, content=b"{}") + + async def __aenter__(self) -> _FakeAsyncClient: + return self + + async def __aexit__(self, *_args: Any) -> None: + return None + + async def aclose(self) -> None: + return None + + async def post(self, *_args: Any, **_kwargs: Any) -> Any: + return self._response + + # Set HTTPS_PROXY pointing at an unreachable address. With + # trust_env=True (the httpx default), this would override the + # transport. The sender MUST pass trust_env=False to ignore it. + with patch.dict(os.environ, {"HTTPS_PROXY": "http://attacker.invalid:9999"}): + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + return_value=MagicMock(), # transport itself isn't used here + ): + with patch("adcp.webhook_sender.httpx.AsyncClient", _FakeAsyncClient): + async with sender: + await sender.send_mcp( + url="https://buyer.example.com/webhooks/adcp", + task_id="task_proxy", + task_type="create_media_buy", + status="completed", + ) + + assert captured_kwargs.get("trust_env") is False, ( + "WebhookSender's per-request httpx.AsyncClient must construct with " + "trust_env=False — otherwise HTTPS_PROXY env vars defeat the IP pin" + ) + assert captured_kwargs.get("follow_redirects") is False From faeb479d98edb776642cbaf84c0dcb859d647dd2 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 29 Apr 2026 13:26:45 -0400 Subject: [PATCH 4/4] test(signing): tighten test claims and add validate-before-sign assertion Final-pass review on 50ae54d9 surfaced three test-quality nits, all addressed. 1. test_owned_client_default_allows_non_standard_ports docstring overclaim: the test patches build_async_ip_pinned_transport with a fake, so the IP-range check inside the real builder doesn't fire. Soften the claim and point at test_owned_client_rejects_loopback_destination for the IP-range coverage. 2. test_owned_client_ignores_https_proxy_env now also asserts "transport" in captured_kwargs. Without this, a future refactor that moves trust_env=False to the eager __aenter__ construction (away from the per-request construction where the proxy-bypass guard actually lives) would pass the test while leaving the per-request client vulnerable. 3. test_owned_client_rejects_hostile_url_before_signing: new test for the validate-before-sign claim made in the _send_bytes docstring. Patches sign_webhook to a MagicMock, points the URL at 127.0.0.1, asserts SSRFValidationError raises AND mock_sign.called is False. No Ed25519/ES256 signature ever materializes for a URL that fails the SSRF guard. Tests: 2258 passing locally (up from 2257). Pre-commit clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../signing/test_webhook_sender_e2e.py | 55 +++++++++++++++++-- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index beda5211..114dfa69 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -494,13 +494,13 @@ async def test_owned_client_default_allows_non_standard_ports() -> None: """The default ``WebhookSender`` (no operator client, no ``allowed_destination_ports``) accepts AdCP-spec-compliant buyers on non-standard ports — :9443 (Tomcat default), :4443 (Spring Boot - default), path-routed multi-tenant gateways. The IP-range check still - applies; we just don't impose a port filter unless the operator - explicitly opts in. + default), path-routed multi-tenant gateways. - Sender-level positive analog of test_ssrf_default_imposes_no_port_filter - in test_jwks.py — confirms the permissive default reaches the actual - delivery path, not just the underlying validator.""" + Sender-level positive analog of ``test_ssrf_default_imposes_no_port_filter`` + in ``test_jwks.py`` — confirms the permissive default reaches the + actual delivery path, not just the underlying validator. The IP-range + check is enforced by the validator and covered separately by + ``test_owned_client_rejects_loopback_destination``.""" from unittest.mock import patch captured: list[tuple[str, int]] = [] @@ -642,8 +642,51 @@ async def post(self, *_args: Any, **_kwargs: Any) -> Any: status="completed", ) + # The per-request httpx.AsyncClient construction passes a `transport` + # kwarg; the eager __aenter__ construction does not. Asserting both + # `transport` is present AND `trust_env=False` is set proves the + # captured kwargs are from the per-request construction, not the + # eager-init that has nothing to do with HTTPS_PROXY hardening. + assert "transport" in captured_kwargs, ( + "captured kwargs do not include `transport` — the assertion below " + "is reading the eager __aenter__ construction, not the per-request " + "construction the proxy-bypass guard lives on" + ) assert captured_kwargs.get("trust_env") is False, ( "WebhookSender's per-request httpx.AsyncClient must construct with " "trust_env=False — otherwise HTTPS_PROXY env vars defeat the IP pin" ) assert captured_kwargs.get("follow_redirects") is False + + +@pytest.mark.asyncio +async def test_owned_client_rejects_hostile_url_before_signing() -> None: + """Validate-before-sign defense in depth: a hostile URL raises + SSRFValidationError synchronously inside ``build_async_ip_pinned_transport``, + BEFORE ``sign_webhook`` is called. No Ed25519/ES256 signature ever + materializes in process memory for a URL that fails the SSRF guard — + anything that snapshots locals on exception (faulthandler, custom + logging) cannot capture a signature that wasn't generated. + + Regression guard for the validate-before-sign reorder in _send_bytes.""" + from unittest.mock import patch + + from adcp.signing import SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + with patch("adcp.webhook_sender.sign_webhook") as mock_sign: + async with sender: + with pytest.raises(SSRFValidationError): + await sender.send_mcp( + url="https://127.0.0.1/webhooks/adcp", + task_id="task_no_sign", + task_type="create_media_buy", + status="completed", + ) + assert mock_sign.called is False, ( + "sign_webhook was called even though SSRF validation rejected the URL — " + "the signature would sit in process memory until the rejection. " + "Validate-before-sign ordering is broken; check _send_bytes." + )