Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 56 additions & 11 deletions examples/a2a_db_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
from __future__ import annotations

import contextlib
import json
import os
import sqlite3
import uuid
Expand All @@ -90,12 +89,21 @@
from pathlib import Path
from typing import Any

from a2a import types as pb
from a2a.server.context import ServerCallContext
from a2a.server.tasks.push_notification_config_store import (
PushNotificationConfigStore,
)
from a2a.server.tasks.task_store import TaskStore
from a2a.types import PushNotificationConfig, Task

# 1.0 folded ``PushNotificationConfig`` into
# :class:`a2a.types.TaskPushNotificationConfig`; the example's
# :meth:`~SqlitePushNotificationConfigStore.set_info` signature still
# accepts a notification config object — the caller passes a
# :class:`TaskPushNotificationConfig` instance.
from a2a.types import Task
from a2a.types import TaskPushNotificationConfig as PushNotificationConfig
from google.protobuf.json_format import MessageToJson, Parse

from adcp.server import ADCPHandler, serve
from adcp.server.responses import capabilities_response, products_response
Expand Down Expand Up @@ -192,7 +200,10 @@ async def _conn(self):

async def save(self, task: Task, context: ServerCallContext | None = None) -> None:
scope = self._scope_from_context(context)
task_json = task.model_dump_json(exclude_none=True)
# Proto messages serialize via ``MessageToJson``; fields stay in
# the canonical proto JSON shape so a different reader on the
# same DB (gRPC bridge, future 1.x client) sees the same bytes.
task_json = MessageToJson(task, preserving_proto_field_name=True)
async with self._conn() as conn:
# NOTE: ``INSERT OR REPLACE`` is last-writer-wins. Production
# stores should guard with a version column or
Expand All @@ -214,8 +225,7 @@ async def get(self, task_id: str, context: ServerCallContext | None = None) -> T
).fetchone()
if row is None:
return None
payload: dict[str, Any] = json.loads(row[0])
return Task.model_validate(payload)
return Parse(row[0], pb.Task())

async def delete(self, task_id: str, context: ServerCallContext | None = None) -> None:
scope = self._scope_from_context(context)
Expand All @@ -225,6 +235,27 @@ async def delete(self, task_id: str, context: ServerCallContext | None = None) -
(scope, task_id),
)

async def list(
self,
params: pb.ListTasksRequest | None = None,
context: ServerCallContext | None = None,
) -> pb.ListTasksResponse:
"""Return tasks owned by the current scope.

``params.page_token`` / ``params.page_size`` support is left as
an exercise for the seller — the reference impl returns every
task in one response to keep the example compact. Real deployments
should implement keyset pagination on ``(updated_at, task_id)``.
"""
scope = self._scope_from_context(context)
async with self._conn() as conn:
rows = conn.execute(
"SELECT task_json FROM a2a_tasks WHERE scope = ? ORDER BY updated_at DESC",
(scope,),
).fetchall()
tasks = [Parse(row[0], pb.Task()) for row in rows]
return pb.ListTasksResponse(tasks=tasks)


# ----------------------------------------------------------------------
# SQLite-backed PushNotificationConfigStore
Expand Down Expand Up @@ -410,7 +441,12 @@ async def _conn(self):
finally:
conn.close()

async def set_info(self, task_id: str, notification_config: PushNotificationConfig) -> None:
async def set_info(
self,
task_id: str,
notification_config: PushNotificationConfig,
context: ServerCallContext | None = None,
) -> None:
scope = self._scope()
# PushNotificationConfig.id is optional on the wire; when the
# client didn't supply one we synthesise a UUID so two clients
Expand All @@ -421,7 +457,7 @@ async def set_info(self, task_id: str, notification_config: PushNotificationConf
# config they just created unless they round-trip the
# server-assigned id.
config_id = notification_config.id or f"auto-{uuid.uuid4()}"
config_json = notification_config.model_dump_json(exclude_none=True)
config_json = MessageToJson(notification_config, preserving_proto_field_name=True)
async with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO a2a_push_configs "
Expand All @@ -430,16 +466,25 @@ async def set_info(self, task_id: str, notification_config: PushNotificationConf
(scope, task_id, config_id, config_json),
)

async def get_info(self, task_id: str) -> list[PushNotificationConfig]:
async def get_info(
self,
task_id: str,
context: ServerCallContext | None = None,
) -> list[PushNotificationConfig]:
scope = self._scope()
async with self._conn() as conn:
rows = conn.execute(
"SELECT config_json FROM a2a_push_configs " "WHERE scope = ? AND task_id = ?",
"SELECT config_json FROM a2a_push_configs WHERE scope = ? AND task_id = ?",
(scope, task_id),
).fetchall()
return [PushNotificationConfig.model_validate(json.loads(r[0])) for r in rows]
return [Parse(r[0], PushNotificationConfig()) for r in rows]

async def delete_info(self, task_id: str, config_id: str | None = None) -> None:
async def delete_info(
self,
task_id: str,
context: ServerCallContext | None = None,
config_id: str | None = None,
) -> None:
scope = self._scope()
async with self._conn() as conn:
if config_id is None:
Expand Down
20 changes: 12 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ dependencies = [
"httpcore>=1.0,<2.0",
"pydantic>=2.0.0",
"typing-extensions>=4.5.0",
# Cap at <1.0 — a2a-sdk 1.0.0 (released 2026-04-20) is a breaking
# rewrite that moves types to a2a.types.a2a_pb2, renames
# DefaultRequestHandler, removes ServerError from a2a.utils.errors,
# and changes Part/Message construction away from ``root=`` kwargs.
# Migration is non-trivial (28+ mypy errors across webhooks, client,
# protocols/a2a, server/a2a_server, server/translate). Tracked as a
# separate compat PR.
"a2a-sdk>=0.3.0,<1.0",
# A2A protocol v1.0 (protobuf types, ProtoJSON on the wire). We run
# on the v1.0 Python SDK with ``enable_v0_3_compat=True`` on the
# server-side JSON-RPC route factory, which dual-serves the AgentCard
# and preserves 0.3 JSON shapes outbound for existing 0.3 clients.
# No coordinated buyer migration needed.
"a2a-sdk>=1.0.1,<2.0",
"sse-starlette>=2.0", # required by a2a-sdk v0.3 compat adapter
"mcp>=1.23.2",
"email-validator>=2.0.0",
"cryptography>=41.0.0",
Expand Down Expand Up @@ -80,6 +79,10 @@ dev = [
# tests/test_mcp_middleware_composition.py and future integration
# tests that exercise the streamable-HTTP ASGI app in-process.
"asgi-lifespan>=2.1.0",
# mypy stubs for the protobuf runtime we use via a2a-sdk 1.0
# (``google.protobuf.json_format``, ``struct_pb2``, ``timestamp_pb2``).
# Without these mypy flags every import as ``import-untyped``.
"types-protobuf>=7.34.1.20260408",
]
docs = [
"pdoc3>=0.10.0",
Expand Down Expand Up @@ -220,4 +223,5 @@ skips = ["B101"] # Allow assert in code (we're not using -O optimization)
dev = [
"datamodel-code-generator==0.56.1",
"pre-commit>=4.4.0",
"types-protobuf>=7.34.1.20260408",
]
3 changes: 2 additions & 1 deletion src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
build_synthetic_capabilities,
validate_capabilities,
)
from adcp.client import ADCPClient, ADCPMultiAgentClient
from adcp.client import ADCPClient, ADCPMultiAgentClient, Checkpoint
from adcp.exceptions import ( # noqa: F401
AdagentsNotFoundError,
AdagentsTimeoutError,
Expand Down Expand Up @@ -566,6 +566,7 @@ def get_adcp_version() -> str:
# Client classes
"ADCPClient",
"ADCPMultiAgentClient",
"Checkpoint",
"RegistryClient",
"PropertyRegistry",
"RegistrySync",
Expand Down
Loading
Loading