Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/adcp/signing/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,26 @@ class VerifiedSigner:
class VerifierCapability:
"""The `request_signing` block a verifier advertises on get_adcp_capabilities.

Defaults to `covers_content_digest="required"`: body integrity must be
authenticated end-to-end. With `"either"`, a MITM inside TLS termination
(reverse proxy, service mesh) can swap bodies freely on unsigned-digest
requests — only pick `"either"` if you've weighed that tradeoff.
Defaults to ``covers_content_digest="either"`` per the AdCP 3.0 schema
(`get-adcp-capabilities-response.json` declares this as the default
explicitly). The schema rationale recommends `"required"` for
spend-committing operations in production, and AdCP 4.0 recommends
`"required"` more broadly.

Operators who want body-integrity authentication end-to-end on
every request — closing the MITM-inside-TLS-termination case where
a reverse proxy or service mesh can swap bodies on unsigned-digest
requests — opt INTO ``covers_content_digest="required"`` explicitly,
or use ``required_for=frozenset({"create_media_buy", ...})`` to
promote spend-committing operations selectively.

The webhook-signing profile (``adcp.signing.webhook_verifier``) hard-
codes ``"required"`` regardless of this default — webhook bodies
always carry signed digests.
"""

supported: bool = True
covers_content_digest: CoversDigestPolicy = "required"
covers_content_digest: CoversDigestPolicy = "either"
required_for: frozenset[str] = field(default_factory=frozenset)
supported_for: frozenset[str] = field(default_factory=frozenset)

Expand Down
67 changes: 51 additions & 16 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,8 @@ async def deliver(
extra_headers: Mapping[str, str] | None = None,
timeout_seconds: float | None = None,
token_field: str | None = None,
allow_private: bool = False,
allowed_ports: frozenset[int] | None = None,
) -> httpx.Response:
"""Dispatch one legacy-auth webhook in a single call.

Expand All @@ -728,11 +730,23 @@ async def deliver(
:func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload`),
an a2a ``Task`` / ``TaskStatusUpdateEvent``, or a plain dict.
Models are dumped with ``mode="json", exclude_none=True``.
client: Optional shared ``httpx.AsyncClient``. Recommended in
production for connection pooling and egress-policy enforcement
(a custom ``httpx.BaseTransport`` is the right place to block
SSRF to private IPs — the helper validates scheme but cannot
see post-DNS resolution without racing TOCTOU).
client: Optional shared ``httpx.AsyncClient``. When supplied, the
caller owns SSRF guarantees — the helper trusts the operator's
transport completely (typically a vetted egress proxy with
mTLS, or an ASGI transport for testing). When omitted, the
helper builds a per-request :class:`adcp.signing.IpPinnedTransport`
so the URL is resolved, SSRF-validated, and pinned to the
resolved IP — same defense applied to :class:`WebhookSender`.
allow_private: Forwarded to the per-request pinned transport
(owned-client path only). ``False`` (default) rejects URLs
whose resolved IP is in a private / loopback / link-local
range. Set ``True`` for dev/CI fixtures that post to internal
endpoints; production should leave it ``False``.
allowed_ports: Forwarded to the per-request pinned transport
(owned-client path only). ``None`` (default) imposes no port
filter — AdCP doesn't constrain webhook ports. Hardened
deployments pass :data:`adcp.signing.DEFAULT_ALLOWED_PORTS`
(`{443, 8443}`) or a custom set.
extra_headers: Merged last. May not override any of
``Content-Type``, ``Content-Digest``, ``Content-Length``,
``Host``, ``Authorization``, ``Signature``, ``Signature-Input``,
Expand Down Expand Up @@ -768,10 +782,13 @@ async def deliver(
DeprecationWarning (fires once): ``authentication`` is a 3.x fallback.

Security notes:
* ``config.url`` is buyer-controlled. The helper enforces HTTPS and
rejects control characters but does NOT block private / link-local
destinations — wire an egress policy via ``client.transport`` to
stop SSRF into your VPC or cloud metadata service.
* ``config.url`` is buyer-controlled. The helper enforces HTTPS,
rejects control characters, AND (on the owned-client path)
builds a per-request IP-pinned transport that runs the full
SSRF range check (loopback / RFC 1918 / link-local / CGNAT /
IPv6 ULA / multicast / cloud metadata) and pins the connection
to the validated IP. Operator-supplied clients skip the SSRF
guard — they own egress policy on their transport.
* ``config.token`` sits in the request body, so any receiver that
logs bodies retains it indefinitely. Treat the token as a
medium-sensitivity correlator, not a long-lived secret.
Expand Down Expand Up @@ -862,14 +879,32 @@ async def deliver(
_validate_header_value(f"extra_headers[{key!r}]", value)
headers[key] = value

owns_client = client is None
effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS
http_client = client or httpx.AsyncClient(timeout=effective_timeout)
try:
return await http_client.post(url, content=body_bytes, headers=headers)
finally:
if owns_client:
await http_client.aclose()
if client is None:
# Owned-client path: build a per-request IP-pinned transport so
# the URL is SSRF-validated and pinned to the resolved IP, with
# follow_redirects=False (rebinding-via-redirect defense) and
# trust_env=False (HTTPS_PROXY env vars cannot route the request
# away from the pinned target). Mirrors the WebhookSender pattern
# — see adcp.webhook_sender._send_bytes for the same shape.
from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport

transport = build_async_ip_pinned_transport(
url,
allow_private=allow_private,
allowed_ports=allowed_ports,
)
async with httpx.AsyncClient(
transport=transport,
timeout=effective_timeout,
follow_redirects=False,
trust_env=False,
) as http_client:
return await http_client.post(url, content=body_bytes, headers=headers)
# Operator-supplied client: trust them completely; they own SSRF
# guarantees on their transport (vetted egress proxy, ASGI test
# transport, etc.).
return await client.post(url, content=body_bytes, headers=headers)


def _extract_config_fields(
Expand Down
9 changes: 7 additions & 2 deletions tests/conformance/signing/test_verifier_behaviors.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ def test_verify_options_rejects_positional() -> None:
# ---- 6a: VerifierCapability default ----


def test_verifier_capability_defaults_to_required_digest() -> None:
def test_verifier_capability_defaults_to_either_digest() -> None:
"""The AdCP 3.0 schema declares ``covers_content_digest`` default as
``"either"`` (``get-adcp-capabilities-response.json``); ``"required"``
is opt-in for spend-committing operations. AdCP 4.0 is expected to
recommend ``"required"`` more broadly. Operators promote operations
selectively via ``required_for=frozenset({"create_media_buy", ...})``."""
cap = VerifierCapability()
assert cap.covers_content_digest == "required"
assert cap.covers_content_digest == "either"
98 changes: 98 additions & 0 deletions tests/test_webhooks_deliver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import hmac
import json
from typing import Any
from unittest.mock import patch

import httpx
import pytest
from a2a.types import TaskState # TaskState is the proto enum; still exported

from adcp.signing import SSRFValidationError
from adcp.types.generated_poc.core.push_notification_config import (
Authentication as PNAuthentication,
)
Expand Down Expand Up @@ -546,3 +548,99 @@ def test_normalize_passthrough_for_unknown_enum_prefixes() -> None:
assert out["status"]["state"] == "completed"
assert out["status"]["message"]["role"] == "user"
assert out["role"] == "user"


# -- SSRF guard regression tests (parity with WebhookSender) ---------------


@pytest.mark.asyncio
async def test_owned_client_rejects_loopback_destination() -> None:
"""The legacy ``deliver()`` helper now wires the same per-request
IP-pinned transport that ``WebhookSender._send_bytes`` uses (see #299).
A buyer-supplied URL pointing at 127.0.0.1 must reject before the
POST hits a real socket — the previous unguarded ``httpx.AsyncClient``
would have delivered to the loopback address.

Operator-supplied clients (``client=...``) skip the SSRF guard by
design; they own egress policy on their transport."""
config = PushNotificationConfig(
url="https://127.0.0.1/webhooks/adcp",
authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN),
)
payload = create_mcp_webhook_payload(
task_id="task_ssrf",
task_type="create_media_buy",
status="completed",
)

with pytest.raises(SSRFValidationError):
await deliver(config, payload)


@pytest.mark.asyncio
async def test_allow_private_dev_escape_hatch() -> None:
"""Adopters with dev/CI fixtures posting to internal endpoints
pass ``allow_private=True`` to disable the IP-range check. Pin-and-
bind still applies (URL gets resolved + connection pinned), and the
rest of the contract stands — bytes signed == bytes posted."""
config = PushNotificationConfig(
url="https://10.0.0.1/webhooks/adcp",
authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN),
)
payload = create_mcp_webhook_payload(
task_id="task_priv",
task_type="create_media_buy",
status="completed",
)

# Stub the pinned-transport build with a transport that actually
# accepts a body without opening a socket. We're testing the kwarg
# plumbing, not the network round-trip.
#
# Patch target is the source module (adcp.signing.ip_pinned_transport)
# because deliver() does a lazy in-function ``from ... import``. If
# the import is ever moved to module-level on adcp.webhooks, the
# patch target must follow — `patch("adcp.webhooks.build_async_ip_pinned_transport", ...)`.
captured: dict[str, Any] = {}

def fake_build(uri: str, **kwargs: Any) -> Any:
captured.update(kwargs)
captured["uri"] = uri
return httpx.MockTransport(lambda _req: httpx.Response(200))

with patch(
"adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport",
side_effect=fake_build,
):
response = await deliver(config, payload, allow_private=True)

assert response.status_code == 200
assert captured["allow_private"] is True
assert captured["uri"] == "https://10.0.0.1/webhooks/adcp"


@pytest.mark.asyncio
async def test_operator_supplied_client_skips_ssrf_guard() -> None:
"""When the operator provides their own ``httpx.AsyncClient``,
``deliver()`` does NOT build a pinned transport — the operator owns
SSRF on their transport (vetted egress proxy, ASGI test transport).
A request that would fail the SSRF range check under the owned-client
path (resolves to a private/loopback IP) succeeds via the operator's
MockTransport without any DNS lookup."""
# https:// is required by the scheme check; MockTransport captures
# the request without a real socket so SSRF range never fires.
config = PushNotificationConfig(
url="https://operator-trusted.test/webhooks/adcp",
authentication=PNAuthentication(schemes=["Bearer"], credentials=_BEARER_TOKEN),
)
payload = create_mcp_webhook_payload(
task_id="task_op",
task_type="create_media_buy",
status="completed",
)

transport = httpx.MockTransport(lambda _req: httpx.Response(200))
async with httpx.AsyncClient(transport=transport) as client:
response = await deliver(config, payload, client=client)

assert response.status_code == 200
Loading