diff --git a/schemas/cache/compliance/comply-test-controller-request.json b/schemas/cache/compliance/comply-test-controller-request.json index a1a7774f..833b22ce 100644 --- a/schemas/cache/compliance/comply-test-controller-request.json +++ b/schemas/cache/compliance/comply-test-controller-request.json @@ -19,7 +19,9 @@ "seed_pricing_option", "seed_creative", "seed_plan", - "seed_media_buy" + "seed_media_buy", + "force_create_media_buy_arm", + "force_task_completion" ], "description": "Test scenario to execute. 'list_scenarios' discovers supported scenarios. 'force_*' and 'simulate_*' trigger state transitions. 'seed_*' scenarios pre-populate fixtures (product, pricing option, creative, plan, media buy) so storyboards can reference them by stable ID without the implementer having to guess which IDs the conformance suite expects." }, @@ -110,6 +112,21 @@ "minimum": 0, "maximum": 100, "description": "Spend to this percentage of budget (0\u2013100). Used by simulate_budget_spend." + }, + "arm": { + "type": "string", + "enum": ["submitted", "input-required"], + "description": "Target arm for the next create_media_buy call. Used by force_create_media_buy_arm." + }, + "task_id": { + "type": "string", + "maxLength": 128, + "description": "Buyer-supplied task ID. Required when arm='submitted' for force_create_media_buy_arm; required for force_task_completion." + }, + "result": { + "type": "object", + "description": "Completion result payload for force_task_completion. Must be non-empty; 256 KB soft cap.", + "additionalProperties": true } }, "additionalProperties": true @@ -396,6 +413,49 @@ } } } + }, + { + "if": { + "properties": { + "scenario": { + "const": "force_create_media_buy_arm" + } + } + }, + "then": { + "required": [ + "params" + ], + "properties": { + "params": { + "required": [ + "arm" + ] + } + } + } + }, + { + "if": { + "properties": { + "scenario": { + "const": "force_task_completion" + } + } + }, + "then": { + "required": [ + "params" + ], + "properties": { + "params": { + "required": [ + "task_id", + "result" + ] + } + } + } } ], "additionalProperties": true, diff --git a/schemas/cache/compliance/comply-test-controller-response.json b/schemas/cache/compliance/comply-test-controller-response.json index c5be4336..ffe88883 100644 --- a/schemas/cache/compliance/comply-test-controller-response.json +++ b/schemas/cache/compliance/comply-test-controller-response.json @@ -28,7 +28,9 @@ "seed_pricing_option", "seed_creative", "seed_plan", - "seed_media_buy" + "seed_media_buy", + "force_create_media_buy_arm", + "force_task_completion" ] }, "description": "Scenarios this seller has implemented. Runners and sellers MUST accept unknown scenario strings (open-for-extension) \u2014 new scenarios may be added in additive minor bumps." @@ -173,6 +175,65 @@ ] } }, + { + "title": "ForcedDirectiveSuccess", + "description": "A force_create_media_buy_arm directive was registered. The next create_media_buy from the same account will be driven into the specified arm.", + "type": "object", + "properties": { + "success": { + "type": "boolean", + "const": true + }, + "arm": { + "type": "string", + "enum": ["submitted", "input-required"], + "description": "Arm the next create_media_buy will be driven into" + }, + "task_id": { + "type": ["string", "null"], + "description": "Buyer-supplied task ID registered with the directive (null for input-required arm)" + }, + "message": { + "type": ["string", "null"], + "description": "Optional message for the input-required arm" + }, + "context": { + "$ref": "../core/context.json" + }, + "ext": { + "$ref": "../core/ext.json" + } + }, + "required": [ + "success", + "arm" + ], + "additionalProperties": true, + "not": { + "anyOf": [ + { + "required": [ + "error" + ] + }, + { + "required": [ + "scenarios" + ] + }, + { + "required": [ + "previous_state" + ] + }, + { + "required": [ + "simulated" + ] + } + ] + } + }, { "title": "ControllerError", "description": "The scenario failed \u2014 invalid transition, unknown entity, unsupported scenario, or invalid params", diff --git a/src/adcp/server/mcp_tools.py b/src/adcp/server/mcp_tools.py index e5a30635..3b338b61 100644 --- a/src/adcp/server/mcp_tools.py +++ b/src/adcp/server/mcp_tools.py @@ -849,6 +849,8 @@ "force_session_status", "simulate_delivery", "simulate_budget_spend", + "force_create_media_buy_arm", + "force_task_completion", ], }, "params": {"type": "object"}, diff --git a/src/adcp/server/test_controller.py b/src/adcp/server/test_controller.py index b52fb64b..35982f37 100644 --- a/src/adcp/server/test_controller.py +++ b/src/adcp/server/test_controller.py @@ -52,6 +52,8 @@ async def force_account_status(self, account_id, status): "force_session_status", "simulate_delivery", "simulate_budget_spend", + "force_create_media_buy_arm", + "force_task_completion", ] @@ -191,6 +193,119 @@ async def simulate_budget_spend( """ raise NotImplementedError + async def force_create_media_buy_arm( + self, + arm: str, + task_id: str | None = None, + message: str | None = None, + *, + context: ToolContext | None = None, + ) -> dict[str, Any]: + """Register a single-shot directive for the next create_media_buy call. + + Drives the next ``create_media_buy`` from the same authenticated + sandbox account into the specified arm (``submitted`` or + ``input-required``). A second registration before consumption + overwrites the pending directive. + + Args: + arm: Target arm — ``"submitted"`` or ``"input-required"``. + task_id: Buyer-supplied task ID. Required when arm=``"submitted"``; + max 128 characters. + message: Optional message for the input-required arm; max 2000 + characters. + context: Optional ToolContext threaded from the server's + context_factory. + + Returns: + ForcedDirectiveSuccess shape:: + + {"arm": arm, "task_id": task_id, "message": message} + + (``"success": True`` is auto-injected by the dispatcher.) + + Raises: + TestControllerError("INVALID_PARAMS", ...): Input validation failed. + The dispatcher validates arm values and the task_id/arm + constraint before calling this method. + + Example:: + + class MySeller(TestControllerStore): + def __init__(self): + # (account, principal) → pending directive + self._directives: dict[str, dict] = {} + + async def force_create_media_buy_arm( + self, arm, task_id=None, message=None, *, context=None + ): + key = context.caller_identity if context else "default" + self._directives[key] = {"arm": arm, "task_id": task_id, "message": message} + return {"arm": arm, "task_id": task_id, "message": message} + """ + raise NotImplementedError + + async def force_task_completion( + self, + task_id: str, + result: dict[str, Any], + *, + context: ToolContext | None = None, + ) -> dict[str, Any]: + """Resolve a previously-submitted task to completed. + + Records ``(task_id, result, ownerKey)`` and transitions the task + from ``submitted`` to ``completed``. Cross-account replays return + ``NOT_FOUND``; identical-params replays are idempotent; diverging + params against a terminal task return ``INVALID_TRANSITION``. + + Args: + task_id: Buyer-supplied task ID; max 128 characters. + result: Completion result payload (non-empty dict, 256 KB soft + cap). Validated against ``async-response-data.json`` by + spec-compliant runners. + context: Optional ToolContext threaded from the server's + context_factory. + + Returns: + StateTransitionSuccess shape:: + + {"previous_state": "submitted", "current_state": "completed"} + + (``"success": True`` is auto-injected by the dispatcher.) + + Raises: + TestControllerError("NOT_FOUND", ...): Cross-account replay — the + task_id exists but belongs to a different principal. + TestControllerError("INVALID_TRANSITION", ..., current_state="completed"): + Diverging-params replay — the task already completed with a + different result payload. + + Example:: + + class MySeller(TestControllerStore): + def __init__(self): + # task_id → {"result": dict, "owner_key": str} + self._completed: dict[str, dict] = {} + + async def force_task_completion(self, task_id, result, *, context=None): + owner = context.caller_identity if context else "default" + existing = self._completed.get(task_id) + if existing: + if existing["owner_key"] != owner: + raise TestControllerError("NOT_FOUND", f"Task {task_id!r} not found") + if existing["result"] == result: + return {"previous_state": "submitted", "current_state": "completed"} + raise TestControllerError( + "INVALID_TRANSITION", + f"Task {task_id!r} already completed with different result", + current_state="completed", + ) + self._completed[task_id] = {"result": result, "owner_key": owner} + return {"previous_state": "submitted", "current_state": "completed"} + """ + raise NotImplementedError + def _list_scenarios(store: TestControllerStore) -> list[str]: """Detect which scenarios a store actually implements. @@ -353,6 +468,38 @@ async def _handle_test_controller( media_buy_id=scenario_params.get("media_buy_id"), **extra, ) + elif scenario == "force_create_media_buy_arm": + arm = scenario_params.get("arm") + if not arm: + return _controller_error("INVALID_PARAMS", "Missing required parameter: 'arm'") + if arm not in ("submitted", "input-required"): + return _controller_error( + "INVALID_PARAMS", "arm must be 'submitted' or 'input-required'" + ) + task_id_val = scenario_params.get("task_id") + # Strip first so whitespace-only strings are treated as absent + task_id_val = task_id_val.strip() if task_id_val else task_id_val + if arm == "submitted" and not task_id_val: + return _controller_error( + "INVALID_PARAMS", "task_id is required when arm='submitted'" + ) + if task_id_val and len(task_id_val) > 128: + return _controller_error("INVALID_PARAMS", "task_id must be ≤128 characters") + message_val = scenario_params.get("message") + if message_val and len(message_val) > 2000: + return _controller_error("INVALID_PARAMS", "message must be ≤2000 characters") + result = await method(arm=arm, task_id=task_id_val, message=message_val, **extra) + elif scenario == "force_task_completion": + task_id_val = scenario_params.get("task_id") + task_id_val = task_id_val.strip() if task_id_val else task_id_val + if not task_id_val: + return _controller_error("INVALID_PARAMS", "Missing required parameter: 'task_id'") + if len(task_id_val) > 128: + return _controller_error("INVALID_PARAMS", "task_id must be ≤128 characters") + result_obj = scenario_params.get("result") + if not isinstance(result_obj, dict) or not result_obj: + return _controller_error("INVALID_PARAMS", "result must be a non-empty object") + result = await method(task_id=task_id_val, result=result_obj, **extra) else: return _controller_error("UNKNOWN_SCENARIO", f"Unknown scenario: {scenario}") except TestControllerError as e: @@ -438,21 +585,14 @@ async def comply_test_controller(**kwargs: Any) -> str: description="Compliance test controller. Sandbox only, not for production use.", ) - # Override schema with the proper comply_test_controller inputSchema + # Override schema with the proper comply_test_controller inputSchema. + # Derived from SCENARIOS so it stays in sync automatically. tool.parameters = { "type": "object", "properties": { "scenario": { "type": "string", - "enum": [ - "list_scenarios", - "force_creative_status", - "force_account_status", - "force_media_buy_status", - "force_session_status", - "simulate_delivery", - "simulate_budget_spend", - ], + "enum": ["list_scenarios"] + SCENARIOS, }, "params": {"type": "object"}, "context": {"type": "object"}, diff --git a/src/adcp/types/generated_poc/compliance/comply_test_controller_request.py b/src/adcp/types/generated_poc/compliance/comply_test_controller_request.py index cf2cd5cb..794e32fd 100644 --- a/src/adcp/types/generated_poc/compliance/comply_test_controller_request.py +++ b/src/adcp/types/generated_poc/compliance/comply_test_controller_request.py @@ -27,6 +27,9 @@ class Scenario(Enum): seed_creative = "seed_creative" seed_plan = "seed_plan" seed_media_buy = "seed_media_buy" + # Added for adcp#3104 / adcp#3138 parity with Node training-agent + force_create_media_buy_arm = "force_create_media_buy_arm" + force_task_completion = "force_task_completion" class ReportedSpend(AdCPBaseModel): @@ -117,6 +120,25 @@ class Params(AdCPBaseModel): le=100.0, ), ] = None + arm: Annotated[ + str | None, + Field( + description="Target arm for the next create_media_buy call. Used by force_create_media_buy_arm." + ), + ] = None + task_id: Annotated[ + str | None, + Field( + description="Buyer-supplied task ID. Required when arm='submitted' for force_create_media_buy_arm; required for force_task_completion.", + max_length=128, + ), + ] = None + result: Annotated[ + dict[str, Any] | None, + Field( + description="Completion result payload for force_task_completion. Must be non-empty; 256 KB soft cap." + ), + ] = None class ComplyTestControllerRequest(AdCPBaseModel): diff --git a/tests/test_force_create_media_buy_arm_and_force_task_completion.py b/tests/test_force_create_media_buy_arm_and_force_task_completion.py new file mode 100644 index 00000000..8a7ba314 --- /dev/null +++ b/tests/test_force_create_media_buy_arm_and_force_task_completion.py @@ -0,0 +1,367 @@ +"""Tests for force_create_media_buy_arm and force_task_completion scenarios. + +Covers the nine-test pattern from adcp-client-python issue #281: + - Registration with valid params + - INVALID_PARAMS branches + - Replay idempotency (force_task_completion) + - Diverging-replay INVALID_TRANSITION (force_task_completion) + - Cross-account isolation (force_task_completion) + - list_scenarios advertisement for both scenarios +""" + +from __future__ import annotations + +from typing import Any + +from adcp.server.test_controller import ( + TestControllerError, + TestControllerStore, + _handle_test_controller, +) + +# --------------------------------------------------------------------------- +# Concrete stores for testing +# --------------------------------------------------------------------------- + + +class _ArmStore(TestControllerStore): + """Minimal store that records the last directive registered.""" + + def __init__(self) -> None: + self.last: dict[str, Any] | None = None + + async def force_create_media_buy_arm( + self, + arm: str, + task_id: str | None = None, + message: str | None = None, + **kwargs: Any, + ) -> dict[str, Any]: + self.last = {"arm": arm, "task_id": task_id, "message": message} + return {"arm": arm, "task_id": task_id, "message": message} + + +class _CompletionStore(TestControllerStore): + """Minimal store implementing force_task_completion with cross-account isolation.""" + + def __init__(self) -> None: + # task_id → {"result": dict, "owner": str} + self._tasks: dict[str, dict[str, Any]] = {} + + async def force_task_completion( + self, + task_id: str, + result: dict[str, Any], + **kwargs: Any, + ) -> dict[str, Any]: + from adcp.server import ToolContext + + context = kwargs.get("context") + owner = context.caller_identity if isinstance(context, ToolContext) else "default" + existing = self._tasks.get(task_id) + if existing: + if existing["owner"] != owner: + raise TestControllerError("NOT_FOUND", f"Task {task_id!r} not found") + if existing["result"] == result: + return {"previous_state": "submitted", "current_state": "completed"} + raise TestControllerError( + "INVALID_TRANSITION", + f"Task {task_id!r} already completed with different result", + current_state="completed", + ) + self._tasks[task_id] = {"result": result, "owner": owner} + return {"previous_state": "submitted", "current_state": "completed"} + + +class _BothStore(_ArmStore, _CompletionStore, TestControllerStore): + """Store that implements both new scenarios — used for list_scenarios tests.""" + + def __init__(self) -> None: + _ArmStore.__init__(self) + _CompletionStore.__init__(self) + + +# --------------------------------------------------------------------------- +# force_create_media_buy_arm — valid registration +# --------------------------------------------------------------------------- + + +async def test_force_create_media_buy_arm_submitted_arm() -> None: + """Registration with arm='submitted' and task_id returns ForcedDirectiveSuccess.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_create_media_buy_arm", + "params": {"arm": "submitted", "task_id": "task-abc-123"}, + }, + ) + assert result["success"] is True + assert result["arm"] == "submitted" + assert result["task_id"] == "task-abc-123" + assert store.last == {"arm": "submitted", "task_id": "task-abc-123", "message": None} + + +async def test_force_create_media_buy_arm_input_required() -> None: + """arm='input-required' with optional message is accepted without task_id.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_create_media_buy_arm", + "params": {"arm": "input-required", "message": "Please provide targeting"}, + }, + ) + assert result["success"] is True + assert result["arm"] == "input-required" + assert result["message"] == "Please provide targeting" + assert result["task_id"] is None + + +async def test_force_create_media_buy_arm_overwrites_pending() -> None: + """A second registration before consumption overwrites the pending directive.""" + store = _ArmStore() + await _handle_test_controller( + store, + {"scenario": "force_create_media_buy_arm", "params": {"arm": "submitted", "task_id": "t1"}}, + ) + result = await _handle_test_controller( + store, + {"scenario": "force_create_media_buy_arm", "params": {"arm": "submitted", "task_id": "t2"}}, + ) + assert result["success"] is True + assert result["task_id"] == "t2" + assert store.last is not None and store.last["task_id"] == "t2" + + +# --------------------------------------------------------------------------- +# force_create_media_buy_arm — INVALID_PARAMS branches +# --------------------------------------------------------------------------- + + +async def test_force_create_media_buy_arm_missing_arm() -> None: + """Missing arm returns INVALID_PARAMS before the store is called.""" + store = _ArmStore() + result = await _handle_test_controller( + store, {"scenario": "force_create_media_buy_arm", "params": {}} + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + assert store.last is None # store was not called + + +async def test_force_create_media_buy_arm_invalid_arm_value() -> None: + """arm='pending' is not a valid arm; dispatcher returns INVALID_PARAMS.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + {"scenario": "force_create_media_buy_arm", "params": {"arm": "pending"}}, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +async def test_force_create_media_buy_arm_submitted_requires_task_id() -> None: + """arm='submitted' without task_id returns INVALID_PARAMS.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + {"scenario": "force_create_media_buy_arm", "params": {"arm": "submitted"}}, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + assert "task_id" in result["error_detail"] + + +async def test_force_create_media_buy_arm_task_id_too_long() -> None: + """task_id > 128 characters returns INVALID_PARAMS.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_create_media_buy_arm", + "params": {"arm": "submitted", "task_id": "x" * 129}, + }, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +async def test_force_create_media_buy_arm_message_too_long() -> None: + """message > 2000 characters returns INVALID_PARAMS.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_create_media_buy_arm", + "params": {"arm": "input-required", "message": "m" * 2001}, + }, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +async def test_force_create_media_buy_arm_whitespace_task_id_rejected() -> None: + """Whitespace-only task_id is treated as absent for arm='submitted'.""" + store = _ArmStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_create_media_buy_arm", + "params": {"arm": "submitted", "task_id": " "}, + }, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + assert store.last is None + + +# --------------------------------------------------------------------------- +# force_task_completion — valid registration + replay semantics +# --------------------------------------------------------------------------- + + +async def test_force_task_completion_valid() -> None: + """Registration with task_id and non-empty result returns StateTransitionSuccess.""" + store = _CompletionStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_task_completion", + "params": {"task_id": "task-xyz", "result": {"status": "ok", "impressions": 1000}}, + }, + ) + assert result["success"] is True + assert result["previous_state"] == "submitted" + assert result["current_state"] == "completed" + + +async def test_force_task_completion_idempotent_replay() -> None: + """Identical-params replay returns the same success without modifying state.""" + store = _CompletionStore() + params = {"task_id": "task-1", "result": {"impressions": 500}} + await _handle_test_controller(store, {"scenario": "force_task_completion", "params": params}) + result = await _handle_test_controller( + store, {"scenario": "force_task_completion", "params": params} + ) + assert result["success"] is True + assert result["current_state"] == "completed" + + +async def test_force_task_completion_diverging_replay_returns_invalid_transition() -> None: + """Diverging-params replay against a terminal task returns INVALID_TRANSITION.""" + store = _CompletionStore() + await _handle_test_controller( + store, + {"scenario": "force_task_completion", "params": {"task_id": "t1", "result": {"v": 1}}}, + ) + result = await _handle_test_controller( + store, + {"scenario": "force_task_completion", "params": {"task_id": "t1", "result": {"v": 2}}}, + ) + assert result["success"] is False + assert result["error"] == "INVALID_TRANSITION" + assert result.get("current_state") == "completed" + + +async def test_force_task_completion_cross_account_isolation() -> None: + """Cross-account replay (different caller_identity) returns NOT_FOUND.""" + from adcp.server import ToolContext + + store = _CompletionStore() + ctx_owner = ToolContext(caller_identity="principal-A") + ctx_other = ToolContext(caller_identity="principal-B") + + await _handle_test_controller( + store, + { + "scenario": "force_task_completion", + "params": {"task_id": "shared-task", "result": {"data": "ok"}}, + }, + context=ctx_owner, + ) + result = await _handle_test_controller( + store, + { + "scenario": "force_task_completion", + "params": {"task_id": "shared-task", "result": {"data": "ok"}}, + }, + context=ctx_other, + ) + assert result["success"] is False + assert result["error"] == "NOT_FOUND" + + +# --------------------------------------------------------------------------- +# force_task_completion — INVALID_PARAMS branches +# --------------------------------------------------------------------------- + + +async def test_force_task_completion_missing_task_id() -> None: + """Missing task_id returns INVALID_PARAMS.""" + store = _CompletionStore() + result = await _handle_test_controller( + store, + {"scenario": "force_task_completion", "params": {"result": {"v": 1}}}, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +async def test_force_task_completion_empty_result() -> None: + """Empty result dict returns INVALID_PARAMS.""" + store = _CompletionStore() + result = await _handle_test_controller( + store, + {"scenario": "force_task_completion", "params": {"task_id": "t1", "result": {}}}, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +async def test_force_task_completion_task_id_too_long() -> None: + """task_id > 128 characters returns INVALID_PARAMS.""" + store = _CompletionStore() + result = await _handle_test_controller( + store, + { + "scenario": "force_task_completion", + "params": {"task_id": "t" * 129, "result": {"v": 1}}, + }, + ) + assert result["success"] is False + assert result["error"] == "INVALID_PARAMS" + + +# --------------------------------------------------------------------------- +# list_scenarios — advertisement of both new scenarios +# --------------------------------------------------------------------------- + + +async def test_list_scenarios_advertises_both_new_scenarios() -> None: + """list_scenarios returns both force_create_media_buy_arm and force_task_completion + when the store implements them.""" + store = _BothStore() + result = await _handle_test_controller(store, {"scenario": "list_scenarios"}) + assert result["success"] is True + scenarios = result["scenarios"] + assert "force_create_media_buy_arm" in scenarios + assert "force_task_completion" in scenarios + + +async def test_list_scenarios_excludes_unimplemented_new_scenarios() -> None: + """Stores that don't override the new methods don't advertise them.""" + store = TestControllerStore() + result = await _handle_test_controller(store, {"scenario": "list_scenarios"}) + assert result["success"] is True + assert "force_create_media_buy_arm" not in result["scenarios"] + assert "force_task_completion" not in result["scenarios"] + + +async def test_list_scenarios_partial_implementation() -> None: + """A store that overrides only force_create_media_buy_arm advertises only that one.""" + store = _ArmStore() + result = await _handle_test_controller(store, {"scenario": "list_scenarios"}) + assert result["success"] is True + assert "force_create_media_buy_arm" in result["scenarios"] + assert "force_task_completion" not in result["scenarios"]