Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/signing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
build_ip_pinned_transport,
)
from adcp.signing.jwks import (
DEFAULT_ALLOWED_PORTS,
AsyncCachingJwksResolver,
AsyncJwksFetcher,
AsyncJwksResolver,
Expand Down Expand Up @@ -251,6 +252,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"CachingJwksResolver",
"CachingRevocationChecker",
"CapabilityProvider",
"DEFAULT_ALLOWED_PORTS",
"DEFAULT_EXPIRES_IN_SECONDS",
"DEFAULT_GRACE_MULTIPLIER",
"DEFAULT_SKEW_SECONDS",
Expand Down
36 changes: 31 additions & 5 deletions src/adcp/signing/ip_pinned_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,44 +289,65 @@ def build_ip_pinned_transport(
uri: str,
*,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
verify: bool = True,
) -> IpPinnedTransport:
"""Resolve ``uri`` once and return a transport pinned to the validated IP.

Raises :class:`SSRFValidationError` if the URI's scheme isn't
``http``/``https``, the host doesn't resolve, or every resolved
IP is in a blocked range.
``http``/``https``, ``allowed_ports`` is set and the URI's port is
outside it, the host doesn't resolve, or every resolved IP is in a
blocked range.

``allowed_ports`` defaults to ``None`` (no port filter — AdCP
doesn't constrain webhook ports). Hardened deployments pass
:data:`adcp.signing.jwks.DEFAULT_ALLOWED_PORTS` (`{443, 8443}`)
or a custom set.

Typical use inside a fetcher::

transport = build_ip_pinned_transport(uri)
with httpx.Client(transport=transport, timeout=10.0) as client:
response = client.get(uri)
"""
hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private)
hostname, resolved_ip, _port = resolve_and_validate_host(
uri,
allow_private=allow_private,
allowed_ports=allowed_ports,
)
return IpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify)


def build_async_ip_pinned_transport(
uri: str,
*,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
verify: bool = True,
) -> AsyncIpPinnedTransport:
"""Build an :class:`AsyncIpPinnedTransport` for ``uri``.

Resolve + validate run synchronously (``socket.getaddrinfo``); this
function itself is not awaitable. The returned transport plugs
into :class:`httpx.AsyncClient`.

``allowed_ports`` defaults to ``None`` (no port filter); see
:func:`build_ip_pinned_transport` for the hardening kwarg
semantics.
"""
hostname, resolved_ip, _port = resolve_and_validate_host(uri, allow_private=allow_private)
hostname, resolved_ip, _port = resolve_and_validate_host(
uri,
allow_private=allow_private,
allowed_ports=allowed_ports,
)
return AsyncIpPinnedTransport(hostname=hostname, resolved_ip=resolved_ip, verify=verify)


def abuild_ip_pinned_transport(
uri: str,
*,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
verify: bool = True,
) -> AsyncIpPinnedTransport:
"""Deprecated alias for :func:`build_async_ip_pinned_transport`.
Expand All @@ -343,4 +364,9 @@ def abuild_ip_pinned_transport(
DeprecationWarning,
stacklevel=2,
)
return build_async_ip_pinned_transport(uri, allow_private=allow_private, verify=verify)
return build_async_ip_pinned_transport(
uri,
allow_private=allow_private,
allowed_ports=allowed_ports,
verify=verify,
)
47 changes: 39 additions & 8 deletions src/adcp/signing/jwks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@
}
)

# Recommended destination ports for hardened SSRF-validated outbound HTTP
# deployments. AdCP itself does not constrain ``pushNotificationConfig.url``
# ports (see ``schemas/cache/core/push-notification-config.json``), so the
# default port-allowlist is permissive — adopters who want a hardening posture
# pass ``allowed_ports=DEFAULT_ALLOWED_PORTS`` (or a custom set) explicitly.
# Rejecting non-standard ports closes a smuggle vector for buyers bouncing
# traffic to internal services on the same routable IP — :25 (SMTP relay),
# :6379 (Redis), :11211 (Memcached), etc. — but that's an operator choice,
# not a framework default that breaks legitimate :9443 / :4443 buyers.
DEFAULT_ALLOWED_PORTS: frozenset[int] = frozenset({443, 8443})

# Upper bound on the number of resolved addresses examined per validation call.
# A malicious DNS server can return thousands of records as a mild amplification
# vector against the validator's inner loop.
Expand Down Expand Up @@ -104,20 +115,26 @@ class AsyncJwksResolver(Protocol):
async def __call__(self, keyid: str) -> dict[str, Any] | None: ...


def validate_jwks_uri(uri: str, *, allow_private: bool = False) -> None:
"""Raise SSRFValidationError if `uri` resolves to a blocked IP or has a bad scheme.
def validate_jwks_uri(
uri: str,
*,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
) -> None:
"""Raise SSRFValidationError on blocked IP, bad scheme, or disallowed port.

This is kept as a standalone no-return helper for callers that only
want validation — :func:`resolve_and_validate_host` returns the
accepted IP when the caller needs it for IP-pinned connects.
Standalone no-return helper for callers that only want validation —
:func:`resolve_and_validate_host` returns the accepted IP when the
caller needs it for IP-pinned connects.
"""
resolve_and_validate_host(uri, allow_private=allow_private)
resolve_and_validate_host(uri, allow_private=allow_private, allowed_ports=allowed_ports)


def resolve_and_validate_host(
uri: str,
*,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
) -> tuple[str, str, int]:
"""Resolve the URI's hostname once and return ``(hostname, ip, port)``.

Expand All @@ -138,6 +155,13 @@ def resolve_and_validate_host(
allow_private:
Skip the reserved-range check. For tests only; cloud-metadata
IPs remain blocked unconditionally.
allowed_ports:
Optional destination-port allowlist. ``None`` (default) imposes
no port filter — the URL's port is unrestricted. Hardened
deployments pass :data:`DEFAULT_ALLOWED_PORTS` (`{443, 8443}`)
or a custom set; the validator then rejects URIs whose port
is outside the set. AdCP doesn't constrain webhook ports in
the spec, so this is operator policy, not a framework default.

Returns
-------
Expand All @@ -148,8 +172,9 @@ def resolve_and_validate_host(
Raises
------
SSRFValidationError
Scheme is not ``http``/``https``, the hostname doesn't resolve,
or every resolved IP is in a blocked range.
Scheme is not ``http``/``https``, ``allowed_ports`` is set and
the URI's port is outside it, the hostname doesn't resolve, or
every resolved IP is in a blocked range.
"""
parts = urlsplit(uri)
if parts.scheme not in ("http", "https"):
Expand Down Expand Up @@ -177,6 +202,11 @@ def resolve_and_validate_host(
except (UnicodeError, UnicodeEncodeError) as exc:
raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc
port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80)
if allowed_ports is not None and port not in allowed_ports:
raise SSRFValidationError(
f"port {port} not allowed for SSRF-validated fetch "
f"(allowed: {sorted(allowed_ports) if allowed_ports else '<empty>'})"
)

try:
infos = socket.getaddrinfo(host, None)
Expand Down Expand Up @@ -461,6 +491,7 @@ async def resolve(keyid: str) -> dict[str, Any] | None:
"AsyncJwksFetcher",
"AsyncJwksResolver",
"CachingJwksResolver",
"DEFAULT_ALLOWED_PORTS",
"DEFAULT_JWKS_COOLDOWN_SECONDS",
"DEFAULT_JWKS_TIMEOUT_SECONDS",
"JwksFetcher",
Expand Down
87 changes: 84 additions & 3 deletions src/adcp/webhook_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
load_private_key_pem,
private_key_from_jwk,
)
from adcp.signing.ip_pinned_transport import (
AsyncIpPinnedTransport,
build_async_ip_pinned_transport,
)
from adcp.signing.webhook_signer import sign_webhook
from adcp.types import GeneratedTaskStatus
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
Expand Down Expand Up @@ -112,13 +116,17 @@ def __init__(
alg: str,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
allow_private_destinations: bool = False,
allowed_destination_ports: frozenset[int] | None = None,
) -> None:
self._private_key = private_key
self._key_id = key_id
self._alg = alg
self._timeout = timeout_seconds
self._client = client
self._owns_client = client is None
self._allow_private_destinations = allow_private_destinations
self._allowed_destination_ports = allowed_destination_ports

@classmethod
def from_jwk(
Expand All @@ -128,13 +136,18 @@ def from_jwk(
d_field: str = "d",
client: httpx.AsyncClient | None = None,
timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
allow_private_destinations: bool = False,
allowed_destination_ports: frozenset[int] | None = None,
) -> WebhookSender:
"""Construct from a JWK that includes the private scalar.

The JWK MUST have ``adcp_use == "webhook-signing"`` — the sender
doesn't validate this (you're signing with your own key; validation
happens at the receiver), but a key whose adcp_use is wrong will be
rejected by every conformant verifier.

``allow_private_destinations`` and ``allowed_destination_ports``
forward to :meth:`__init__` — see that signature for semantics.
"""
# Snapshot the mapping once — a live Mapping could otherwise return
# different values across the adcp_use / kid / d / alg reads.
Expand Down Expand Up @@ -163,6 +176,8 @@ def from_jwk(
alg=alg,
client=client,
timeout_seconds=timeout_seconds,
allow_private_destinations=allow_private_destinations,
allowed_destination_ports=allowed_destination_ports,
)

@classmethod
Expand All @@ -175,6 +190,8 @@ def from_pem(
passphrase: bytes | None = None,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
allow_private_destinations: bool = False,
allowed_destination_ports: frozenset[int] | None = None,
) -> WebhookSender:
"""Load a private key from a PEM file and bind it as a webhook sender.

Expand All @@ -194,6 +211,8 @@ def from_pem(
client: Optional pre-built :class:`httpx.AsyncClient` to share
across the SDK; the sender owns its own client when omitted.
timeout_seconds: Per-request timeout for the owned client.
allow_private_destinations: Forwarded to :meth:`__init__`.
allowed_destination_ports: Forwarded to :meth:`__init__`.

Raises:
ValueError: ``alg`` is not ed25519 / es256, or the PEM contains
Expand Down Expand Up @@ -239,6 +258,8 @@ def from_pem(
alg=alg,
client=client,
timeout_seconds=timeout_seconds,
allow_private_destinations=allow_private_destinations,
allowed_destination_ports=allowed_destination_ports,
)

def __repr__(self) -> str:
Expand Down Expand Up @@ -471,7 +492,42 @@ async def _send_bytes(
idempotency_key: str,
extra_headers: Mapping[str, str] | None,
) -> WebhookDeliveryResult:
"""Sign + POST a pre-serialized body. Shared by send_raw and resend."""
"""Sign + POST a pre-serialized body through an SSRF-validated transport.

When the sender owns its httpx client (the default — ``client=None``
was passed to ``__init__``), every delivery builds a per-request
:class:`adcp.signing.ip_pinned_transport.AsyncIpPinnedTransport`
that resolves the destination, runs the full SSRF range check
(loopback / RFC 1918 / link-local / CGNAT / IPv6 ULA / multicast /
cloud metadata), enforces the port allowlist, and pins the
connection to the validated IP. This closes the DNS-rebinding
TOCTOU between validate and connect.

When the operator supplied their own client
(``WebhookSender(client=...)`` — typically a vetted egress proxy
with mTLS to a known buyer set, or an ASGI transport for testing),
the sender trusts the operator's transport completely. Pin-and-bind
is skipped; the operator's transport owns SSRF.

On the owned-client path, SSRF validation runs **before** signing
so a hostile URL is rejected without first generating an
Ed25519/ES256 signature over the body. That signature would
otherwise sit in process memory until the SSRF rejection —
anything that snapshots locals on exception (faulthandler,
custom logging) could capture it. Validate first, sign second.
"""
# Build the pinned transport up-front for the owned-client path.
# This runs SSRF + port validation against the URL before any
# signing happens; a hostile URL raises SSRFValidationError here
# and the body never gets signed.
transport: AsyncIpPinnedTransport | None = None
if self._owns_client:
transport = build_async_ip_pinned_transport(
url,
allow_private=self._allow_private_destinations,
allowed_ports=self._allowed_destination_ports,
)

base_headers = {"Content-Type": "application/json"}
signed = sign_webhook(
method="POST",
Expand All @@ -495,8 +551,33 @@ async def _send_bytes(
for k, v in extra_headers.items():
headers[k] = v

client = await self._get_client()
response = await client.post(url, content=body, headers=headers)
if transport is not None:
# Owned-client path. ``trust_env=False`` prevents httpx from
# routing the request through ``HTTPS_PROXY`` / ``HTTP_PROXY``
# env vars — every other pinned-transport callsite in the
# codebase sets this for the same reason (default_jwks_fetcher,
# async_default_jwks_fetcher, revocation_fetcher). Without it,
# an attacker who controls process env can route the signed
# webhook through their endpoint, defeating the IP pin entirely.
async with httpx.AsyncClient(
transport=transport,
timeout=self._timeout,
follow_redirects=False,
trust_env=False,
) as client:
response = await client.post(url, content=body, headers=headers)
else:
# Operator-supplied client — they own the SSRF guarantees on
# their transport (proxy allowlist, mTLS, etc.). Reachable as
# None after aclose(); explicit raise survives ``python -O``
# which would strip an assert.
if self._client is None:
raise RuntimeError(
"WebhookSender's operator-supplied client was already "
"closed. Construct a new sender or pass a fresh client."
)
response = await self._client.post(url, content=body, headers=headers)

return WebhookDeliveryResult(
status_code=response.status_code,
idempotency_key=idempotency_key,
Expand Down
8 changes: 6 additions & 2 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ def create_mcp_webhook_payload(
task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
timestamp: When the webhook was generated (defaults to current UTC time)
result: Task-specific payload (AdCP response data)
operation_id: Publisher-defined operation identifier (deprecated from payload,
should be in URL routing, but included for backward compatibility)
operation_id: Client-generated identifier the buyer embedded in the
webhook URL when registering push-notification config. Publishers
MUST echo this back in the payload so buyers correlate notifications
without parsing URL paths (per ``mcp-webhook-payload.json``).
Senders extracting the value from the URL path on emission populate
this field; callers constructing payloads directly pass it through.
message: Human-readable summary of task state
context_id: Session/conversation identifier
domain: AdCP domain this task belongs to
Expand Down
Loading
Loading