diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 6761cf0e..4d85fe39 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -157,6 +157,7 @@ build_ip_pinned_transport, ) from adcp.signing.jwks import ( + DEFAULT_ALLOWED_PORTS, AsyncCachingJwksResolver, AsyncJwksFetcher, AsyncJwksResolver, @@ -251,6 +252,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "CachingJwksResolver", "CachingRevocationChecker", "CapabilityProvider", + "DEFAULT_ALLOWED_PORTS", "DEFAULT_EXPIRES_IN_SECONDS", "DEFAULT_GRACE_MULTIPLIER", "DEFAULT_SKEW_SECONDS", diff --git a/src/adcp/signing/ip_pinned_transport.py b/src/adcp/signing/ip_pinned_transport.py index 0a8df62d..92e3b830 100644 --- a/src/adcp/signing/ip_pinned_transport.py +++ b/src/adcp/signing/ip_pinned_transport.py @@ -289,13 +289,20 @@ def build_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> IpPinnedTransport: """Resolve ``uri`` once and return a transport pinned to the validated IP. Raises :class:`SSRFValidationError` if the URI's scheme isn't - ``http``/``https``, the host doesn't resolve, or every resolved - IP is in a blocked range. + ``http``/``https``, ``allowed_ports`` is set and the URI's port is + outside it, the host doesn't resolve, or every resolved IP is in a + blocked range. + + ``allowed_ports`` defaults to ``None`` (no port filter — AdCP + doesn't constrain webhook ports). Hardened deployments pass + :data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`) + or a custom set. Typical use inside a fetcher:: @@ -303,7 +310,11 @@ def build_ip_pinned_transport( with httpx.Client(transport=transport, timeout=10.0) as client: response = client.get(uri) """ - hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private) + hostname, resolved_ip, _port = resolve_and_validate_host( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + ) return IpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify) @@ -311,6 +322,7 @@ def build_async_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> AsyncIpPinnedTransport: """Build an :class:`AsyncIpPinnedTransport` for ``uri``. @@ -318,8 +330,16 @@ def build_async_ip_pinned_transport( Resolve + validate run synchronously (``socket.getaddrinfo``); this function itself is not awaitable. The returned transport plugs into :class:`httpx.AsyncClient`. + + ``allowed_ports`` defaults to ``None`` (no port filter); see + :func:`build_ip_pinned_transport` for the hardening kwarg + semantics. """ - hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private) + hostname, resolved_ip, _port = resolve_and_validate_host( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + ) return AsyncIpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify) @@ -327,6 +347,7 @@ def abuild_ip_pinned_transport( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, verify: bool = True, ) -> AsyncIpPinnedTransport: """Deprecated alias for :func:`build_async_ip_pinned_transport`. @@ -343,4 +364,9 @@ def abuild_ip_pinned_transport( DeprecationWarning, stacklevel=2, ) - return build_async_ip_pinned_transport(uri, allow_private=allow_private, verify=verify) + return build_async_ip_pinned_transport( + uri, + allow_private=allow_private, + allowed_ports=allowed_ports, + verify=verify, + ) diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index 79fcbc9c..d4fa75b4 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -53,6 +53,17 @@ } ) +# Recommended destination ports for hardened SSRF-validated outbound HTTP +# deployments. AdCP itself does not constrain ``pushNotificationConfig.url`` +# ports (see ``schemas/cache/core/push-notification-config.json``), so the +# default port-allowlist is permissive — adopters who want a hardening posture +# pass ``allowed_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set) explicitly. +# Rejecting non-standard ports closes a smuggle vector for buyers bouncing +# traffic to internal services on the same routable IP — :25 (SMTP relay), +# :6379 (Redis), :11211 (Memcached), etc. — but that's an operator choice, +# not a framework default that breaks legitimate :9443 / :4443 buyers. +DEFAULT_ALLOWED_PORTS: frozenset[int] = frozenset({443, 8443}) + # Upper bound on the number of resolved addresses examined per validation call. # A malicious DNS server can return thousands of records as a mild amplification # vector against the validator's inner loop. @@ -104,20 +115,26 @@ class AsyncJwksResolver(Protocol): async def __call__(self, keyid: str) -> dict[str, Any] | None: ... -def validate_jwks_uri(uri: str, *, allow_private: bool = False) -> None: - """Raise SSRFValidationError if `uri` resolves to a blocked IP or has a bad scheme. +def validate_jwks_uri( + uri: str, + *, + allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, +) -> None: + """Raise SSRFValidationError on blocked IP, bad scheme, or disallowed port. - This is kept as a standalone no-return helper for callers that only - want validation — :func:`resolve_and_validate_host` returns the - accepted IP when the caller needs it for IP-pinned connects. + Standalone no-return helper for callers that only want validation — + :func:`resolve_and_validate_host` returns the accepted IP when the + caller needs it for IP-pinned connects. """ - resolve_and_validate_host(uri, allow_private=allow_private) + resolve_and_validate_host(uri, allow_private=allow_private, allowed_ports=allowed_ports) def resolve_and_validate_host( uri: str, *, allow_private: bool = False, + allowed_ports: frozenset[int] | None = None, ) -> tuple[str, str, int]: """Resolve the URI's hostname once and return ``(hostname, ip, port)``. @@ -138,6 +155,13 @@ def resolve_and_validate_host( allow_private: Skip the reserved-range check. For tests only; cloud-metadata IPs remain blocked unconditionally. + allowed_ports: + Optional destination-port allowlist. ``None`` (default) imposes + no port filter — the URL's port is unrestricted. Hardened + deployments pass :data:`DEFAULT_ALLOWED_PORTS` (`{443, 8443}`) + or a custom set; the validator then rejects URIs whose port + is outside the set. AdCP doesn't constrain webhook ports in + the spec, so this is operator policy, not a framework default. Returns ------- @@ -148,8 +172,9 @@ def resolve_and_validate_host( Raises ------ SSRFValidationError - Scheme is not ``http``/``https``, the hostname doesn't resolve, - or every resolved IP is in a blocked range. + Scheme is not ``http``/``https``, ``allowed_ports`` is set and + the URI's port is outside it, the hostname doesn't resolve, or + every resolved IP is in a blocked range. """ parts = urlsplit(uri) if parts.scheme not in ("http", "https"): @@ -177,6 +202,11 @@ def resolve_and_validate_host( except (UnicodeError, UnicodeEncodeError) as exc: raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80) + if allowed_ports is not None and port not in allowed_ports: + raise SSRFValidationError( + f"port {port} not allowed for SSRF-validated fetch " + f"(allowed: {sorted(allowed_ports) if allowed_ports else ''})" + ) try: infos = socket.getaddrinfo(host, None) @@ -461,6 +491,7 @@ async def resolve(keyid: str) -> dict[str, Any] | None: "AsyncJwksFetcher", "AsyncJwksResolver", "CachingJwksResolver", + "DEFAULT_ALLOWED_PORTS", "DEFAULT_JWKS_COOLDOWN_SECONDS", "DEFAULT_JWKS_TIMEOUT_SECONDS", "JwksFetcher", diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index eaeb4019..0d054a18 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -44,6 +44,10 @@ load_private_key_pem, private_key_from_jwk, ) +from adcp.signing.ip_pinned_transport import ( + AsyncIpPinnedTransport, + build_async_ip_pinned_transport, +) from adcp.signing.webhook_signer import sign_webhook from adcp.types import GeneratedTaskStatus from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData @@ -112,6 +116,8 @@ def __init__( alg: str, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> None: self._private_key = private_key self._key_id = key_id @@ -119,6 +125,8 @@ def __init__( self._timeout = timeout_seconds self._client = client self._owns_client = client is None + self._allow_private_destinations = allow_private_destinations + self._allowed_destination_ports = allowed_destination_ports @classmethod def from_jwk( @@ -128,6 +136,8 @@ def from_jwk( d_field: str = "d", client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> WebhookSender: """Construct from a JWK that includes the private scalar. @@ -135,6 +145,9 @@ def from_jwk( doesn't validate this (you're signing with your own key; validation happens at the receiver), but a key whose adcp_use is wrong will be rejected by every conformant verifier. + + ``allow_private_destinations`` and ``allowed_destination_ports`` + forward to :meth:`__init__` — see that signature for semantics. """ # Snapshot the mapping once — a live Mapping could otherwise return # different values across the adcp_use / kid / d / alg reads. @@ -163,6 +176,8 @@ def from_jwk( alg=alg, client=client, timeout_seconds=timeout_seconds, + allow_private_destinations=allow_private_destinations, + allowed_destination_ports=allowed_destination_ports, ) @classmethod @@ -175,6 +190,8 @@ def from_pem( passphrase: bytes | None = None, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, + allow_private_destinations: bool = False, + allowed_destination_ports: frozenset[int] | None = None, ) -> WebhookSender: """Load a private key from a PEM file and bind it as a webhook sender. @@ -194,6 +211,8 @@ def from_pem( client: Optional pre-built :class:`httpx.AsyncClient` to share across the SDK; the sender owns its own client when omitted. timeout_seconds: Per-request timeout for the owned client. + allow_private_destinations: Forwarded to :meth:`__init__`. + allowed_destination_ports: Forwarded to :meth:`__init__`. Raises: ValueError: ``alg`` is not ed25519 / es256, or the PEM contains @@ -239,6 +258,8 @@ def from_pem( alg=alg, client=client, timeout_seconds=timeout_seconds, + allow_private_destinations=allow_private_destinations, + allowed_destination_ports=allowed_destination_ports, ) def __repr__(self) -> str: @@ -471,7 +492,42 @@ async def _send_bytes( idempotency_key: str, extra_headers: Mapping[str, str] | None, ) -> WebhookDeliveryResult: - """Sign + POST a pre-serialized body. Shared by send_raw and resend.""" + """Sign + POST a pre-serialized body through an SSRF-validated transport. + + When the sender owns its httpx client (the default — ``client=None`` + was passed to ``__init__``), every delivery builds a per-request + :class:`adcp.signing.ip_pinned_transport.AsyncIpPinnedTransport` + that resolves the destination, runs the full SSRF range check + (loopback / RFC 1918 / link-local / CGNAT / IPv6 ULA / multicast / + cloud metadata), enforces the port allowlist, and pins the + connection to the validated IP. This closes the DNS-rebinding + TOCTOU between validate and connect. + + When the operator supplied their own client + (``WebhookSender(client=...)`` — typically a vetted egress proxy + with mTLS to a known buyer set, or an ASGI transport for testing), + the sender trusts the operator's transport completely. Pin-and-bind + is skipped; the operator's transport owns SSRF. + + On the owned-client path, SSRF validation runs **before** signing + so a hostile URL is rejected without first generating an + Ed25519/ES256 signature over the body. That signature would + otherwise sit in process memory until the SSRF rejection — + anything that snapshots locals on exception (faulthandler, + custom logging) could capture it. Validate first, sign second. + """ + # Build the pinned transport up-front for the owned-client path. + # This runs SSRF + port validation against the URL before any + # signing happens; a hostile URL raises SSRFValidationError here + # and the body never gets signed. + transport: AsyncIpPinnedTransport | None = None + if self._owns_client: + transport = build_async_ip_pinned_transport( + url, + allow_private=self._allow_private_destinations, + allowed_ports=self._allowed_destination_ports, + ) + base_headers = {"Content-Type": "application/json"} signed = sign_webhook( method="POST", @@ -495,8 +551,33 @@ async def _send_bytes( for k, v in extra_headers.items(): headers[k] = v - client = await self._get_client() - response = await client.post(url, content=body, headers=headers) + if transport is not None: + # Owned-client path. ``trust_env=False`` prevents httpx from + # routing the request through ``HTTPS_PROXY`` / ``HTTP_PROXY`` + # env vars — every other pinned-transport callsite in the + # codebase sets this for the same reason (default_jwks_fetcher, + # async_default_jwks_fetcher, revocation_fetcher). Without it, + # an attacker who controls process env can route the signed + # webhook through their endpoint, defeating the IP pin entirely. + async with httpx.AsyncClient( + transport=transport, + timeout=self._timeout, + follow_redirects=False, + trust_env=False, + ) as client: + response = await client.post(url, content=body, headers=headers) + else: + # Operator-supplied client — they own the SSRF guarantees on + # their transport (proxy allowlist, mTLS, etc.). Reachable as + # None after aclose(); explicit raise survives ``python -O`` + # which would strip an assert. + if self._client is None: + raise RuntimeError( + "WebhookSender's operator-supplied client was already " + "closed. Construct a new sender or pass a fresh client." + ) + response = await self._client.post(url, content=body, headers=headers) + return WebhookDeliveryResult( status_code=response.status_code, idempotency_key=idempotency_key, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index e61b097f..4ee80c1a 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -107,8 +107,12 @@ def create_mcp_webhook_payload( task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy") timestamp: When the webhook was generated (defaults to current UTC time) result: Task-specific payload (AdCP response data) - operation_id: Publisher-defined operation identifier (deprecated from payload, - should be in URL routing, but included for backward compatibility) + operation_id: Client-generated identifier the buyer embedded in the + webhook URL when registering push-notification config. Publishers + MUST echo this back in the payload so buyers correlate notifications + without parsing URL paths (per ``mcp-webhook-payload.json``). + Senders extracting the value from the URL path on emission populate + this field; callers constructing payloads directly pass it through. message: Human-readable summary of task state context_id: Session/conversation identifier domain: AdCP domain this task belongs to diff --git a/tests/conformance/signing/test_async_revocation.py b/tests/conformance/signing/test_async_revocation.py index 250a981b..f3bc01de 100644 --- a/tests/conformance/signing/test_async_revocation.py +++ b/tests/conformance/signing/test_async_revocation.py @@ -176,9 +176,7 @@ async def test_averify_jws_rejects_wrong_typ() -> None: token = b64_header + "." + b64_payload + "." + b64url_encode(signature) with pytest.raises(JwsMalformedError, match="typ"): - await averify_jws_document( - token, jwks_resolver=resolver, expected_typ=REVOCATION_LIST_TYP - ) + await averify_jws_document(token, jwks_resolver=resolver, expected_typ=REVOCATION_LIST_TYP) # -- as_async_resolver -------------------------------------------------- @@ -282,12 +280,12 @@ async def test_async_replay_older_list_rejected() -> None: revoked_kids=[], ) fetcher = _ScriptedAsyncFetcher() - fetcher.enqueue(FetchResult( - body=_sign_compact(newer, private=private), etag='"v2"', not_modified=False - )) - fetcher.enqueue(FetchResult( - body=_sign_compact(older, private=private), etag='"v1"', not_modified=False - )) + fetcher.enqueue( + FetchResult(body=_sign_compact(newer, private=private), etag='"v2"', not_modified=False) + ) + fetcher.enqueue( + FetchResult(body=_sign_compact(older, private=private), etag='"v1"', not_modified=False) + ) wall_clock, mono_clock, advance = _controllable_clock( datetime(2026, 4, 18, 14, 15, tzinfo=timezone.utc) @@ -349,9 +347,7 @@ async def test_async_aprime_fails_fast() -> None: async def test_async_is_jti_revoked() -> None: private, resolver = _operator_key_and_resolver() - token = _sign_compact( - _make_payload(revoked_jtis=["jti-abc"]), private=private - ) + token = _sign_compact(_make_payload(revoked_jtis=["jti-abc"]), private=private) fetcher = _ScriptedAsyncFetcher() fetcher.enqueue(FetchResult(body=token, etag=None, not_modified=False)) @@ -414,9 +410,7 @@ async def slow_fetcher( wall_clock=wall_clock, clock=mono_clock, ) - results = await asyncio.gather( - checker("rev"), checker("rev"), checker("rev"), checker("rev") - ) + results = await asyncio.gather(checker("rev"), checker("rev"), checker("rev"), checker("rev")) assert all(r is True for r in results) assert fetch_count[0] == 1 @@ -506,9 +500,7 @@ async def fetch( headers["If-Modified-Since"] = if_modified_since async with httpx.AsyncClient(transport=transport, base_url=ISSUER) as client: - response = await client.get( - "/.well-known/governance-revocations.json", headers=headers - ) + response = await client.get("/.well-known/governance-revocations.json", headers=headers) if response.status_code == 304: return FetchResult( body="", diff --git a/tests/conformance/signing/test_jwks.py b/tests/conformance/signing/test_jwks.py index bd0692c5..a893a490 100644 --- a/tests/conformance/signing/test_jwks.py +++ b/tests/conformance/signing/test_jwks.py @@ -8,6 +8,7 @@ import pytest from adcp.signing import ( + DEFAULT_ALLOWED_PORTS, CachingJwksResolver, SignatureVerificationError, SSRFValidationError, @@ -99,6 +100,98 @@ def test_ssrf_caps_resolved_address_scan() -> None: validate_jwks_uri("https://example.com/jwks.json") +# ---- Port allowlist (opt-in operator hardening) ---- +# Rationale lives in adcp.signing.jwks.DEFAULT_ALLOWED_PORTS docstring. + + +@pytest.mark.parametrize( + "uri", + [ + "https://example.com:9443/jwks.json", # Tomcat default — legitimate + "https://example.com:4443/jwks.json", # Spring Boot default — legitimate + "https://example.com:8080/jwks.json", # buyer's path-routed gateway + "http://example.com:80/jwks.json", # plain HTTP — scheme check is separate + ], +) +def test_ssrf_default_imposes_no_port_filter(uri: str) -> None: + """Without explicit ``allowed_ports``, any port that satisfies the + scheme check passes. AdCP doesn't restrict ``pushNotificationConfig.url`` + to standard ports — :8443/:9443/:4443 are all legitimate buyer + deployments.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri(uri) + + +@pytest.mark.parametrize( + "uri,port", + [ + ("https://example.com:25/jwks.json", 25), # SMTP + ("https://example.com:6379/jwks.json", 6379), # Redis + ("https://example.com:11211/jwks.json", 11211), # Memcached + ("https://example.com:8080/jwks.json", 8080), # generic HTTP-alt + ], +) +def test_ssrf_rejects_disallowed_ports_when_hardening(uri: str, port: int) -> None: + """Operators opt into the hardening posture by passing + ``DEFAULT_ALLOWED_PORTS`` or a custom set; non-allowlisted ports + then reject.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + with pytest.raises(SSRFValidationError, match=f"port {port} not allowed"): + validate_jwks_uri(uri, allowed_ports=DEFAULT_ALLOWED_PORTS) + + +@pytest.mark.parametrize( + "uri", + [ + "https://example.com/jwks.json", # implicit :443 + "https://example.com:443/jwks.json", + "https://example.com:8443/jwks.json", + ], +) +def test_ssrf_default_allowlist_passes_canonical_https_ports(uri: str) -> None: + """``DEFAULT_ALLOWED_PORTS = {443, 8443}`` is the recommended hardening + set; both canonical-https and HTTPS-alt pass.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri(uri, allowed_ports=DEFAULT_ALLOWED_PORTS) + + +def test_ssrf_allowed_ports_custom_set() -> None: + """Adopters with trusted on-prem deployments can permit non-standard ports.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + validate_jwks_uri( + "https://example.com:9000/jwks.json", + allowed_ports=frozenset({443, 9000}), + ) + + +def test_ssrf_empty_allowlist_rejects_every_port() -> None: + """``allowed_ports=frozenset()`` is meaningful: no port satisfies the + set. Distinct from ``allowed_ports=None`` (no filter at all). Used + by deployments that want to fail closed unless a port is explicitly + permitted.""" + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + with pytest.raises(SSRFValidationError, match="port 443 not allowed"): + validate_jwks_uri( + "https://example.com/jwks.json", + allowed_ports=frozenset(), + ) + + # ---- CachingJwksResolver ---- diff --git a/tests/conformance/signing/test_webhook_sender_e2e.py b/tests/conformance/signing/test_webhook_sender_e2e.py index fd90baa2..114dfa69 100644 --- a/tests/conformance/signing/test_webhook_sender_e2e.py +++ b/tests/conformance/signing/test_webhook_sender_e2e.py @@ -397,3 +397,296 @@ async def test_receiver_rejects_when_sender_key_wrong() -> None: ) assert not result.ok assert result.status_code == 401 + + +@pytest.mark.asyncio +async def test_owned_client_rejects_loopback_destination() -> None: + """Default sender (no operator client) must reject loopback URLs via SSRF + guard. Without pin-and-bind, a buyer-supplied webhook URL pointing at + 127.0.0.1 lets a public agent reach in-cluster services.""" + from adcp.signing import SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + async with sender: + with pytest.raises(SSRFValidationError): + await sender.send_mcp( + url="https://127.0.0.1/webhooks/adcp", + task_id="task_ssrf", + task_type="create_media_buy", + status="completed", + ) + + +@pytest.mark.asyncio +async def test_owned_client_rejects_disallowed_port_when_hardening_configured() -> None: + """Operators opt in to the port-allowlist hardening posture by passing + ``allowed_destination_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set). + The sender then rejects buyer URLs on ports outside the set, closing + the smuggle vector to internal Redis/SMTP/Memcached on the same + routable IP. Without the kwarg, AdCP-spec-compliant ports like + :9443/:4443 still pass.""" + from unittest.mock import patch + + from adcp.signing import DEFAULT_ALLOWED_PORTS, SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + allowed_destination_ports=DEFAULT_ALLOWED_PORTS, + ) + # Mock DNS so we don't hit a live host; the port check fires before + # resolution even runs (in resolve_and_validate_host). + with patch( + "adcp.signing.jwks.socket.getaddrinfo", + return_value=[(2, 1, 6, "", ("93.184.216.34", 0))], + ): + async with sender: + with pytest.raises(SSRFValidationError, match="port 6379 not allowed"): + await sender.send_mcp( + url="https://example.com:6379/webhooks/adcp", + task_id="task_port", + task_type="create_media_buy", + status="completed", + ) + + +@pytest.mark.asyncio +async def test_send_mcp_threads_operation_id_into_payload() -> None: + """``operation_id`` is buyer-supplied and embedded by the buyer in the + webhook URL when registering pushNotificationConfig. Per the schema at + ``mcp-webhook-payload.json``, publishers MUST echo it in the payload so + buyers correlate notifications without parsing URL paths. + + Regression guard for the docstring-vs-schema mismatch noted in the + DecisioningPlatform foundations audit (the prior docstring discouraged + populating the field, contradicting the schema's MUST).""" + captured_payloads: list[dict[str, Any]] = [] + app = FastAPI() + + @app.post("/webhooks/adcp/{op_id}") + async def echo(request: Request, op_id: str) -> JSONResponse: + body = await request.body() + captured_payloads.append(json.loads(body)) + return JSONResponse({"ok": True}, status_code=200) + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + client=client, + ) + result = await sender.send_mcp( + url="http://test/webhooks/adcp/op_abc123", + task_id="task_op_id_test", + task_type="create_media_buy", + status="completed", + operation_id="op_abc123", + result={"media_buy_id": "mb_1"}, + ) + + assert result.ok + assert captured_payloads[0]["operation_id"] == "op_abc123" + + +@pytest.mark.asyncio +async def test_owned_client_default_allows_non_standard_ports() -> None: + """The default ``WebhookSender`` (no operator client, no + ``allowed_destination_ports``) accepts AdCP-spec-compliant buyers on + non-standard ports — :9443 (Tomcat default), :4443 (Spring Boot + default), path-routed multi-tenant gateways. + + Sender-level positive analog of ``test_ssrf_default_imposes_no_port_filter`` + in ``test_jwks.py`` — confirms the permissive default reaches the + actual delivery path, not just the underlying validator. The IP-range + check is enforced by the validator and covered separately by + ``test_owned_client_rejects_loopback_destination``.""" + from unittest.mock import patch + + captured: list[tuple[str, int]] = [] + app = FastAPI() + + @app.post("/webhooks/adcp") + async def echo(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}, status_code=200) + + asgi_transport = httpx.ASGITransport(app=app) + + # Stub the pinned-transport build so we don't open a real socket to + # a public IP; capture that the build was attempted for the + # non-standard port and route the actual POST through ASGI. + def fake_build(uri: str, **_kwargs: Any) -> Any: + from urllib.parse import urlparse + + parsed = urlparse(uri) + captured.append((parsed.hostname or "", parsed.port or 443)) + return asgi_transport + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + side_effect=fake_build, + ): + async with sender: + result = await sender.send_mcp( + url="http://test:9443/webhooks/adcp", + task_id="task_nonstd", + task_type="create_media_buy", + status="completed", + ) + + assert result.ok + assert captured == [("test", 9443)] + + +@pytest.mark.asyncio +async def test_operator_supplied_client_bypasses_ssrf_guard() -> None: + """When the operator passes their own httpx client (vetted egress + proxy, ASGI test transport, etc.), the framework trusts them + completely — pin-and-bind is skipped and the SSRF range check does + NOT fire. The operator owns SSRF on their transport. + + Named regression test for the documented contract; without this, a + future refactor that mistakenly applies pin-and-bind to both + branches breaks ASGI-based unit tests and any vetted-proxy + deployments that route via private networks.""" + app = FastAPI() + + @app.post("/webhooks/adcp") + async def echo(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}, status_code=200) + + transport = httpx.ASGITransport(app=app) + # base_url is loopback-equivalent. With the SSRF guard active this + # would raise SSRFValidationError; under the operator-trust contract + # it must succeed. + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + client=client, + ) + result = await sender.send_mcp( + url="http://test/webhooks/adcp", + task_id="task_op_trust", + task_type="create_media_buy", + status="completed", + ) + + assert result.ok + + +@pytest.mark.asyncio +async def test_owned_client_ignores_https_proxy_env() -> None: + """``HTTPS_PROXY`` / ``HTTP_PROXY`` env vars MUST NOT defeat the IP + pin on the owned-client path. httpx's default ``trust_env=True`` + routes requests through proxy env vars, which would bypass the + AsyncIpPinnedTransport's network_backend entirely — an attacker who + controls process env (sidecar config, dotenv, malicious cluster + egress policy) could otherwise pivot to receiving the signed + webhook body. + + The sender constructs its per-request ``httpx.AsyncClient`` with + ``trust_env=False`` to close this. Regression guard: if a future + refactor drops the kwarg, this test catches it by setting a proxy + env var that points at an unreachable address; the pinned transport + must still route via the resolved IP and reach the test ASGI app + (which we can't directly observe under a real socket, so we assert + the proxy var is ignored by checking the constructor config).""" + import os + from unittest.mock import MagicMock, patch + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + + captured_kwargs: dict[str, Any] = {} + + class _FakeAsyncClient: + def __init__(self, **kwargs: Any) -> None: + # Capture kwargs only from the per-request construction in + # _send_bytes; __aenter__'s eager _get_client() also flows + # through here but its kwargs don't affect the per-request + # delivery path. The last writer wins; the per-request call + # is the one we care about. + captured_kwargs.update(kwargs) + self._response = MagicMock(status_code=200, headers={}, content=b"{}") + + async def __aenter__(self) -> _FakeAsyncClient: + return self + + async def __aexit__(self, *_args: Any) -> None: + return None + + async def aclose(self) -> None: + return None + + async def post(self, *_args: Any, **_kwargs: Any) -> Any: + return self._response + + # Set HTTPS_PROXY pointing at an unreachable address. With + # trust_env=True (the httpx default), this would override the + # transport. The sender MUST pass trust_env=False to ignore it. + with patch.dict(os.environ, {"HTTPS_PROXY": "http://attacker.invalid:9999"}): + with patch( + "adcp.webhook_sender.build_async_ip_pinned_transport", + return_value=MagicMock(), # transport itself isn't used here + ): + with patch("adcp.webhook_sender.httpx.AsyncClient", _FakeAsyncClient): + async with sender: + await sender.send_mcp( + url="https://buyer.example.com/webhooks/adcp", + task_id="task_proxy", + task_type="create_media_buy", + status="completed", + ) + + # The per-request httpx.AsyncClient construction passes a `transport` + # kwarg; the eager __aenter__ construction does not. Asserting both + # `transport` is present AND `trust_env=False` is set proves the + # captured kwargs are from the per-request construction, not the + # eager-init that has nothing to do with HTTPS_PROXY hardening. + assert "transport" in captured_kwargs, ( + "captured kwargs do not include `transport` — the assertion below " + "is reading the eager __aenter__ construction, not the per-request " + "construction the proxy-bypass guard lives on" + ) + assert captured_kwargs.get("trust_env") is False, ( + "WebhookSender's per-request httpx.AsyncClient must construct with " + "trust_env=False — otherwise HTTPS_PROXY env vars defeat the IP pin" + ) + assert captured_kwargs.get("follow_redirects") is False + + +@pytest.mark.asyncio +async def test_owned_client_rejects_hostile_url_before_signing() -> None: + """Validate-before-sign defense in depth: a hostile URL raises + SSRFValidationError synchronously inside ``build_async_ip_pinned_transport``, + BEFORE ``sign_webhook`` is called. No Ed25519/ES256 signature ever + materializes in process memory for a URL that fails the SSRF guard — + anything that snapshots locals on exception (faulthandler, custom + logging) cannot capture a signature that wasn't generated. + + Regression guard for the validate-before-sign reorder in _send_bytes.""" + from unittest.mock import patch + + from adcp.signing import SSRFValidationError + + sender = WebhookSender.from_jwk( + {**WEBHOOK_JWK, "d": WEBHOOK_JWK["_private_d_for_test_only"]}, + ) + with patch("adcp.webhook_sender.sign_webhook") as mock_sign: + async with sender: + with pytest.raises(SSRFValidationError): + await sender.send_mcp( + url="https://127.0.0.1/webhooks/adcp", + task_id="task_no_sign", + task_type="create_media_buy", + status="completed", + ) + assert mock_sign.called is False, ( + "sign_webhook was called even though SSRF validation rejected the URL — " + "the signature would sit in process memory until the rejection. " + "Validate-before-sign ordering is broken; check _send_bytes." + )