Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,8 @@ If an SSE connection drops, use `GET /v1/tasks/{task_id}:subscribe` to re-subscr
- Idempotency contract: repeated `tasks/cancel` on an already `canceled` task returns the current terminal task state without error.
- Terminal subscribe contract: calling `subscribe` on a terminal task replays one terminal `Task` snapshot and then closes the stream.
- These two semantics are also declared as machine-readable `service_behaviors` in the compatibility profile and wire contract extensions.
- The service emits lightweight metric log records (`logger=opencode_a2a.execution.executor`):
- At `A2A_LOG_LEVEL=DEBUG`, the service emits lightweight metric log records
(`logger=opencode_a2a.execution.executor`):
- `a2a_stream_requests_total`
- `a2a_stream_active` (`value=1` when a stream starts, `value=-1` when it closes)
- `opencode_stream_retries_total`
Expand Down
18 changes: 13 additions & 5 deletions src/opencode_a2a/execution/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ async def run(self) -> None:
break

except httpx.HTTPStatusError as exc:
logger.exception("OpenCode request failed with HTTP error")
logger.warning(
"OpenCode request failed with HTTP status=%s",
exc.response.status_code,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
error_type, state, message = _format_upstream_error(
exc,
request="send_message",
Expand All @@ -190,7 +194,11 @@ async def run(self) -> None:
streaming_request=self._prepared.streaming_request,
)
except httpx.TimeoutException as exc:
logger.exception("OpenCode request timed out")
logger.warning(
"OpenCode request timed out: %s",
exc,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
await self._executor._emit_error(
self._event_queue,
task_id=self._task_id,
Expand All @@ -212,7 +220,7 @@ async def run(self) -> None:
streaming_request=self._prepared.streaming_request,
)
except UpstreamConcurrencyLimitError as exc:
logger.warning("OpenCode request rejected by concurrency budget: %s", exc)
logger.debug("OpenCode request rejected by concurrency budget: %s", exc)
await self._executor._emit_error(
self._event_queue,
task_id=self._task_id,
Expand Down Expand Up @@ -304,11 +312,11 @@ async def _handle_response(self, response: Any) -> None:
)

logger.debug(
"OpenCode response task_id=%s session_id=%s message_id=%s text=%s",
"OpenCode response task_id=%s session_id=%s message_id=%s text_len=%s",
self._task_id,
response.session_id,
resolved_message_id,
response_text,
len(response_text),
)

if response_error is not None:
Expand Down
4 changes: 2 additions & 2 deletions src/opencode_a2a/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
(
"Received message identity=%s credential_id=%s auth_scheme=%s trace_id=%s "
"task_id=%s context_id=%s "
"streaming=%s text=%s part_count=%s"
"streaming=%s text_len=%s part_count=%s"
),
identity,
credential_id,
Expand All @@ -319,7 +319,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
task_id,
context_id,
streaming_request,
user_text,
len(user_text),
len(request_parts),
)
prepared = PreparedExecution(
Expand Down
8 changes: 4 additions & 4 deletions src/opencode_a2a/execution/stream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ async def _emit_chunks(chunks: list[_NormalizedStreamChunk]) -> None:
)
logger.debug(
"Stream chunk task_id=%s session_id=%s block_type=%s append=%s "
"shared_source=%s internal_source=%s text=%s",
"shared_source=%s internal_source=%s content_len=%s",
task_id,
session_id,
chunk.block_type,
effective_append,
chunk.shared_source,
chunk.internal_source,
chunk.content_key,
len(chunk.content_key),
)
if chunk.block_type == BlockType.TOOL_CALL:
self._emit_metric("tool_call_chunks_emitted_total")
Expand Down Expand Up @@ -343,7 +343,7 @@ def _snapshot_chunks(
)
]
state.buffer = snapshot
logger.warning(
logger.debug(
"Suppressing non-prefix snapshot rewrite "
"task_id=%s session_id=%s part_id=%s block_type=%s had_delta=%s",
task_id,
Expand Down Expand Up @@ -593,7 +593,7 @@ def _tool_chunks(
if stop_event.is_set():
break
self._emit_metric("opencode_stream_retries_total")
logger.exception("OpenCode event stream failed; retrying")
logger.debug("OpenCode event stream failed; retrying", exc_info=True)
await self._sleep(backoff)
backoff = min(backoff * 2, max_backoff)
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion src/opencode_a2a/opencode_upstream_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def reserve(self, *, operation: str) -> AsyncIterator[None]:
async with self._lock:
inflight = self._inflight
if inflight >= self._limit:
logger.warning(
logger.debug(
"OpenCode upstream concurrency limit exceeded "
"category=%s operation=%s limit=%s inflight=%s",
self._category,
Expand Down
4 changes: 2 additions & 2 deletions src/opencode_a2a/server/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def on_message_send_stream(self, params, context=None):
):
yield event
except (asyncio.CancelledError, GeneratorExit):
logger.warning("Client disconnected. Cancelling producer task %s", task_id)
logger.debug("Client disconnected. Cancelling producer task %s", task_id)
producer_task.cancel()
await queue.close(immediate=True)
raise
Expand Down Expand Up @@ -504,7 +504,7 @@ async def push_notification_callback() -> None:
try:
current_task = asyncio.current_task()
if current_task is not None and current_task.cancelled():
logger.warning(
logger.debug(
"Client disconnected from message request. Cancelling task %s", task_id
)
producer_task.cancel()
Expand Down
28 changes: 24 additions & 4 deletions tests/execution/test_agent_errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from unittest.mock import AsyncMock, MagicMock

import httpx
Expand Down Expand Up @@ -173,7 +174,7 @@ async def create_session(title: str | None = None, *, directory: str | None = No


@pytest.mark.asyncio
async def test_streaming_execute_http_error_emits_status_update_with_metadata() -> None:
async def test_streaming_execute_http_error_emits_status_update_with_metadata(caplog) -> None:
request = httpx.Request("POST", "http://127.0.0.1:4096/message")
response = httpx.Response(
status_code=429,
Expand Down Expand Up @@ -204,7 +205,8 @@ async def create_session(title: str | None = None, *, directory: str | None = No
)
event_queue = AsyncMock(spec=EventQueue)

await executor.execute(context, event_queue)
with caplog.at_level(logging.INFO, logger="opencode_a2a.execution.coordinator"):
await executor.execute(context, event_queue)

status = None
for call in event_queue.enqueue_event.call_args_list:
Expand All @@ -223,10 +225,20 @@ async def create_session(title: str | None = None, *, directory: str | None = No
assert status.status.state == TaskState.failed
assert status.metadata["opencode"]["error"]["type"] == "UPSTREAM_QUOTA_EXCEEDED"
assert status.metadata["opencode"]["error"]["upstream_status"] == 429
http_logs = [
record
for record in caplog.records
if "OpenCode request failed with HTTP status=429" in record.message
]
assert len(http_logs) == 1
assert http_logs[0].levelno == logging.WARNING
assert not http_logs[0].exc_info


@pytest.mark.asyncio
async def test_streaming_execute_upstream_backpressure_emits_status_update_with_metadata() -> None:
async def test_streaming_execute_upstream_backpressure_emits_status_update_with_metadata(
caplog,
) -> None:
client = AsyncMock()

async def create_session(title: str | None = None, *, directory: str | None = None) -> str:
Expand Down Expand Up @@ -255,7 +267,8 @@ async def create_session(title: str | None = None, *, directory: str | None = No
)
event_queue = AsyncMock(spec=EventQueue)

await executor.execute(context, event_queue)
with caplog.at_level(logging.DEBUG, logger="opencode_a2a.execution.coordinator"):
await executor.execute(context, event_queue)

status = None
for call in event_queue.enqueue_event.call_args_list:
Expand All @@ -273,6 +286,13 @@ async def create_session(title: str | None = None, *, directory: str | None = No
assert status is not None
assert status.status.state == TaskState.failed
assert status.metadata["opencode"]["error"]["type"] == "UPSTREAM_BACKPRESSURE"
backpressure_logs = [
record
for record in caplog.records
if "OpenCode request rejected by concurrency budget" in record.message
]
assert len(backpressure_logs) == 1
assert backpressure_logs[0].levelno == logging.DEBUG


@pytest.mark.asyncio
Expand Down
7 changes: 7 additions & 0 deletions tests/execution/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,10 @@ async def _fast_sleep(_seconds: float) -> None:

messages = [record.message for record in caplog.records]
assert sum("metric=opencode_stream_retries_total" in message for message in messages) == 1
retry_logs = [
record
for record in caplog.records
if "OpenCode event stream failed; retrying" in record.message
]
assert len(retry_logs) == 1
assert retry_logs[0].levelno == logging.DEBUG
44 changes: 44 additions & 0 deletions tests/execution/test_streaming_output_contract_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,50 @@
)


@pytest.mark.asyncio
async def test_execute_debug_log_records_text_length_without_raw_user_text(caplog) -> None:
raw_text = "do not write this prompt to logs"
response_text = "do not write this response to logs"
client = DummyStreamingClient(
stream_events_payload=[],
response_text=response_text,
)
client.settings = make_settings(
test_bearer_token="test",
opencode_base_url="http://localhost",
a2a_log_body_limit=64,
)
executor = OpencodeAgentExecutor(client, streaming_enabled=True)
executor._should_stream = lambda context: True # type: ignore[method-assign]
queue = DummyEventQueue()

with caplog.at_level(logging.DEBUG, logger="opencode_a2a.execution"):
await executor.execute(
make_request_context(
task_id="task-debug-input",
context_id="ctx-debug-input",
text=raw_text,
),
queue,
)

received_messages = [
record.message for record in caplog.records if record.message.startswith("Received message")
]
assert len(received_messages) == 1
assert f"text_len={len(raw_text)}" in received_messages[0]
opencode_response_messages = [
record.message
for record in caplog.records
if record.message.startswith("OpenCode response")
]
assert len(opencode_response_messages) == 1
assert f"text_len={len(response_text)}" in opencode_response_messages[0]
all_messages = "\n".join(record.message for record in caplog.records)
assert raw_text not in all_messages
assert response_text not in all_messages


@pytest.mark.asyncio
async def test_streaming_logs_raw_upstream_events_at_debug(caplog) -> None:
client = DummyStreamingClient(
Expand Down
21 changes: 16 additions & 5 deletions tests/upstream/test_opencode_upstream_client_params.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json as json_module
import logging

import httpx
import pytest
Expand Down Expand Up @@ -593,6 +594,7 @@ async def fake_post(path: str, *, params=None, json=None, **_kwargs):
@pytest.mark.asyncio
async def test_send_message_raises_concurrency_limit_error_when_request_budget_exhausted(
monkeypatch,
caplog,
):
client = OpencodeUpstreamClient(
make_settings(
Expand All @@ -618,11 +620,20 @@ async def fake_post(path: str, *, params=None, json=None, **_kwargs):
first_request = asyncio.create_task(client.send_message("ses-1", "hello"))
await started.wait()

with pytest.raises(
UpstreamConcurrencyLimitError,
match="request concurrency limit exceeded",
):
await client.send_message("ses-2", "blocked")
with caplog.at_level(logging.DEBUG, logger="opencode_a2a.opencode_upstream_client"):
with pytest.raises(
UpstreamConcurrencyLimitError,
match="request concurrency limit exceeded",
):
await client.send_message("ses-2", "blocked")

budget_logs = [
record
for record in caplog.records
if "OpenCode upstream concurrency limit exceeded" in record.message
]
assert len(budget_logs) == 1
assert budget_logs[0].levelno == logging.DEBUG

release.set()
await first_request
Expand Down