Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/adcp/server/mcp_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@
"inputSchema": {
"type": "object",
"properties": {
"account": {"type": "object"},
"scenario": {
"type": "string",
"enum": [
Expand All @@ -849,6 +850,11 @@
"force_session_status",
"simulate_delivery",
"simulate_budget_spend",
"seed_product",
"seed_pricing_option",
"seed_creative",
"seed_plan",
"seed_media_buy",
],
},
"params": {"type": "object"},
Expand Down
11 changes: 11 additions & 0 deletions src/adcp/server/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ async def get_products():

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any

from adcp.server.helpers import valid_actions_for_status

_logger = logging.getLogger("adcp.server")


def _serialize(items: list[Any]) -> list[Any]:
"""Serialize a list of dicts or Pydantic models to plain dicts."""
Expand Down Expand Up @@ -79,6 +82,14 @@ def capabilities_response(
idempotency=store.capability(),
)
"""
if compliance_testing is not None and not idempotency:
_logger.warning(
"capabilities_response: adcp.idempotency not declared. "
"The AdCP 3.0.1 storyboard runner may downgrade to v2 mode and "
"cascade failures across idempotency-dependent tracks. "
"Pass idempotency={'supported': False} to declare non-support, "
"or idempotency=store.capability() to declare support."
)
adcp_info: dict[str, Any] = {"major_versions": major_versions or [3]}
if idempotency:
adcp_info["idempotency"] = idempotency
Expand Down
46 changes: 46 additions & 0 deletions src/adcp/server/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def serve(
*,
name: str = "adcp-agent",
port: int | None = None,
host: str | None = None,
transport: str = "streamable-http",
instructions: str | None = None,
test_controller: TestControllerStore | None = None,
Expand All @@ -315,6 +316,7 @@ def serve(
message_parser: MessageParser | None = None,
advertise_all: bool = False,
max_request_size: int | None = None,
streaming_responses: bool = False,
) -> None:
"""Start an MCP or A2A server from an ADCP handler or server builder.

Expand Down Expand Up @@ -371,6 +373,23 @@ def serve(
entirely (not recommended — the cap is the only guard
against adversarial payloads exhausting Pydantic validation
CPU/memory). See :mod:`adcp.server._size_limit`.
host: Network interface to bind to (MCP transports only). Defaults
to the ``ADCP_HOST`` environment variable, then ``"0.0.0.0"``
(all interfaces). Use ``"127.0.0.1"`` for local-only
development. Container deployments (Fly.io, k8s, Cloud Run)
require ``"0.0.0.0"`` so the process listens on the
container's external interface.
streaming_responses: When ``False`` (default), the streamable-http
transport returns one ``application/json`` response per
request. AdCP tools today don't emit progress events, and
FastMCP's SSE-internal streaming default has an upstream bug
that drops the ASGI response without completing — making the
storyboard runner report ``overall_status: "unreachable"``.
Set to ``True`` only if your tools genuinely emit progress
notifications and your clients consume the SSE stream
(MCP transports only). Note: the legacy ``transport="sse"``
is a separate (deprecated) MCP transport, unrelated to this
flag.

Security:
This function does NOT configure authentication. In production,
Expand Down Expand Up @@ -428,13 +447,15 @@ async def force_account_status(self, account_id, status):
handler,
name=name,
port=port,
host=host,
transport=transport,
instructions=instructions,
test_controller=test_controller,
context_factory=context_factory,
middleware=middleware,
advertise_all=advertise_all,
max_request_size=max_request_size,
streaming_responses=streaming_responses,
)
else:
valid = ", ".join(sorted(("a2a", "streamable-http", "sse", "stdio")))
Expand Down Expand Up @@ -523,24 +544,28 @@ def _serve_mcp(
*,
name: str,
port: int | None,
host: str | None = None,
transport: str,
instructions: str | None,
test_controller: TestControllerStore | None,
context_factory: ContextFactory | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
advertise_all: bool = False,
max_request_size: int | None = None,
streaming_responses: bool = False,
) -> None:
"""Start an MCP server."""
mcp = create_mcp_server(
handler,
name=name,
port=port,
host=host,
instructions=instructions,
include_test_controller=test_controller is not None,
context_factory=context_factory,
middleware=middleware,
advertise_all=advertise_all,
streaming_responses=streaming_responses,
)

if test_controller is not None:
Expand Down Expand Up @@ -644,11 +669,13 @@ def create_mcp_server(
*,
name: str = "adcp-agent",
port: int | None = None,
host: str | None = None,
instructions: str | None = None,
include_test_controller: bool = False,
context_factory: ContextFactory | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
advertise_all: bool = False,
streaming_responses: bool = False,
) -> Any:
"""Create a FastMCP server from an ADCP handler without starting it.

Expand Down Expand Up @@ -692,6 +719,17 @@ def create_mcp_server(
:func:`~adcp.server.get_tools_for_handler` for semantics;
use ``True`` for spec-compliance storyboards or when you
deliberately want to expose a ``not_supported`` tool.
host: Network interface to bind to. Defaults to the ``ADCP_HOST``
environment variable, then ``"0.0.0.0"`` (all interfaces).
Use ``"127.0.0.1"`` for local-only development.
streaming_responses: When ``False`` (default), the streamable-http
transport returns one ``application/json`` response per
request — the right shape for AdCP tools today (none of which
emit progress events). The FastMCP SSE-internal streaming
default also has an upstream bug that drops the ASGI response
without completing, blocking the storyboard runner. Set to
``True`` only if your tools genuinely emit progress
notifications and your clients consume the SSE stream.

Returns:
A configured FastMCP server instance. Call ``mcp.run()`` to start,
Expand Down Expand Up @@ -744,7 +782,15 @@ def create_mcp_server(
from mcp.server.fastmcp import FastMCP

resolved_port = port or int(os.environ.get("PORT", "3001"))
resolved_host = host if host is not None else (os.environ.get("ADCP_HOST") or "0.0.0.0")
mcp = FastMCP(name, instructions=instructions, port=resolved_port)
mcp.settings.host = resolved_host
if not streaming_responses:
# FastMCP's SSE-internal default has an upstream bug; switching to
# stateless JSON-response mode is also semantically correct for
# AdCP tools, which return one complete envelope per request.
mcp.settings.stateless_http = True
mcp.settings.json_response = True
_register_handler_tools(
mcp,
handler,
Expand Down
114 changes: 114 additions & 0 deletions src/adcp/server/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ async def force_account_status(self, account_id, status):
"force_session_status",
"simulate_delivery",
"simulate_budget_spend",
# seed_* scenarios pre-populate storyboard fixtures (AdCP 3.0.1)
"seed_product",
"seed_pricing_option",
"seed_creative",
"seed_plan",
"seed_media_buy",
]


Expand Down Expand Up @@ -191,6 +197,77 @@ async def simulate_budget_spend(
"""
raise NotImplementedError

async def seed_product(
self,
fixture: dict[str, Any] | None = None,
product_id: str | None = None,
*,
context: ToolContext | None = None,
) -> dict[str, Any]:
"""Pre-populate a product fixture for storyboard tests (AdCP 3.0.1).

Returns:
{"product_id": str}
"""
raise NotImplementedError

async def seed_pricing_option(
self,
fixture: dict[str, Any] | None = None,
product_id: str | None = None,
pricing_option_id: str | None = None,
*,
context: ToolContext | None = None,
) -> dict[str, Any]:
"""Pre-populate a pricing option fixture for storyboard tests (AdCP 3.0.1).

Returns:
{"pricing_option_id": str}
"""
raise NotImplementedError

async def seed_creative(
self,
fixture: dict[str, Any] | None = None,
creative_id: str | None = None,
*,
context: ToolContext | None = None,
) -> dict[str, Any]:
"""Pre-populate a creative fixture for storyboard tests (AdCP 3.0.1).

Returns:
{"creative_id": str}
"""
raise NotImplementedError

async def seed_plan(
self,
fixture: dict[str, Any] | None = None,
plan_id: str | None = None,
*,
context: ToolContext | None = None,
) -> dict[str, Any]:
"""Pre-populate a plan fixture for storyboard tests (AdCP 3.0.1).

Returns:
{"plan_id": str}
"""
raise NotImplementedError

async def seed_media_buy(
self,
fixture: dict[str, Any] | None = None,
media_buy_id: str | None = None,
*,
context: ToolContext | None = None,
) -> dict[str, Any]:
"""Pre-populate a media buy fixture for storyboard tests (AdCP 3.0.1).

Returns:
{"media_buy_id": str}
"""
raise NotImplementedError


def _list_scenarios(store: TestControllerStore) -> list[str]:
"""Detect which scenarios a store actually implements.
Expand Down Expand Up @@ -353,6 +430,37 @@ async def _handle_test_controller(
media_buy_id=scenario_params.get("media_buy_id"),
**extra,
)
elif scenario == "seed_product":
result = await method(
fixture=scenario_params.get("fixture"),
product_id=scenario_params.get("product_id"),
**extra,
)
elif scenario == "seed_pricing_option":
result = await method(
fixture=scenario_params.get("fixture"),
product_id=scenario_params.get("product_id"),
pricing_option_id=scenario_params.get("pricing_option_id"),
**extra,
)
elif scenario == "seed_creative":
result = await method(
fixture=scenario_params.get("fixture"),
creative_id=scenario_params.get("creative_id"),
**extra,
)
elif scenario == "seed_plan":
result = await method(
fixture=scenario_params.get("fixture"),
plan_id=scenario_params.get("plan_id"),
**extra,
)
elif scenario == "seed_media_buy":
result = await method(
fixture=scenario_params.get("fixture"),
media_buy_id=scenario_params.get("media_buy_id"),
**extra,
)
else:
return _controller_error("UNKNOWN_SCENARIO", f"Unknown scenario: {scenario}")
except TestControllerError as e:
Expand Down Expand Up @@ -442,6 +550,7 @@ async def comply_test_controller(**kwargs: Any) -> str:
tool.parameters = {
"type": "object",
"properties": {
"account": {"type": "object"},
"scenario": {
"type": "string",
"enum": [
Expand All @@ -452,6 +561,11 @@ async def comply_test_controller(**kwargs: Any) -> str:
"force_session_status",
"simulate_delivery",
"simulate_budget_spend",
"seed_product",
"seed_pricing_option",
"seed_creative",
"seed_plan",
"seed_media_buy",
],
},
"params": {"type": "object"},
Expand Down
Loading