Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ Example with notification opt-out:

## API Overview

- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
Expand Down Expand Up @@ -272,10 +272,11 @@ When `nextCursor` is `null`, you’ve reached the final page.

### Example: Track thread status changes

`thread/status/changed` is emitted whenever a loaded thread's status changes:
`thread/status/changed` is emitted whenever a loaded thread's status changes after it has already been introduced to the client:

- Includes `threadId` and the new `status`.
- Status can be `notLoaded`, `idle`, `systemError`, or `active` (with `activeFlags`; `active` implies running).
- `thread/start`, `thread/fork`, and detached review threads do not emit a separate initial `thread/status/changed`; their `thread/started` notification already carries the current `thread.status`.

```json
{ "method": "thread/status/changed", "params": {
Expand Down
6 changes: 3 additions & 3 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,7 @@ impl CodexMessageProcessor {
}

self.thread_watch_manager
.upsert_thread(thread.clone())
.upsert_thread_silently(thread.clone())
.await;

thread.status = resolve_thread_status(
Expand Down Expand Up @@ -3596,7 +3596,7 @@ impl CodexMessageProcessor {
}

self.thread_watch_manager
.upsert_thread(thread.clone())
.upsert_thread_silently(thread.clone())
.await;

thread.status = resolve_thread_status(
Expand Down Expand Up @@ -6176,7 +6176,7 @@ impl CodexMessageProcessor {
Ok(summary) => {
let mut thread = summary_to_thread(summary);
self.thread_watch_manager
.upsert_thread(thread.clone())
.upsert_thread_silently(thread.clone())
.await;
thread.status = resolve_thread_status(
self.thread_watch_manager
Expand Down
63 changes: 59 additions & 4 deletions codex-rs/app-server/src/thread_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ impl ThreadWatchManager {
}

pub(crate) async fn upsert_thread(&self, thread: Thread) {
self.mutate_and_publish(move |state| state.upsert_thread(thread.id))
.await;
self.upsert_thread_with_notification(thread, true).await;
}

pub(crate) async fn upsert_thread_silently(&self, thread: Thread) {
self.upsert_thread_with_notification(thread, false).await;
}

pub(crate) async fn remove_thread(&self, thread_id: &str) {
Expand Down Expand Up @@ -256,6 +259,11 @@ impl ThreadWatchManager {
.await;
}

async fn upsert_thread_with_notification(&self, thread: Thread, emit_notification: bool) {
self.mutate_and_publish(move |state| state.upsert_thread(thread.id, emit_notification))
.await;
}

fn pending_counter(
runtime: &mut RuntimeFacts,
guard_type: ThreadWatchActiveGuardType,
Expand Down Expand Up @@ -289,14 +297,22 @@ struct ThreadWatchState {
}

impl ThreadWatchState {
fn upsert_thread(&mut self, thread_id: String) -> Option<ThreadStatusChangedNotification> {
fn upsert_thread(
&mut self,
thread_id: String,
emit_notification: bool,
) -> Option<ThreadStatusChangedNotification> {
let previous_status = self.status_for(&thread_id);
let runtime = self
.runtime_by_thread_id
.entry(thread_id.clone())
.or_default();
runtime.is_loaded = true;
self.status_changed_notification(thread_id, previous_status)
if emit_notification {
self.status_changed_notification(thread_id, previous_status)
} else {
None
}
}

fn remove_thread(&mut self, thread_id: &str) -> Option<ThreadStatusChangedNotification> {
Expand Down Expand Up @@ -692,6 +708,45 @@ mod tests {
);
}

#[tokio::test]
async fn silent_upsert_skips_initial_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
)));

manager
.upsert_thread_silently(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;

assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Idle,
);
assert!(
timeout(Duration::from_millis(100), outgoing_rx.recv())
.await
.is_err(),
"silent upsert should not emit thread/status/changed"
);

manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(
recv_status_changed_notification(&mut outgoing_rx).await,
ThreadStatusChangedNotification {
thread_id: INTERACTIVE_THREAD_ID.to_string(),
status: ThreadStatus::Active {
active_flags: vec![],
},
},
);
}

async fn wait_for_status(
manager: &ThreadWatchManager,
thread_id: &str,
Expand Down
34 changes: 34 additions & 0 deletions codex-rs/app-server/tests/suite/v2/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use app_test_support::to_response;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
Expand All @@ -19,9 +20,12 @@ use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
Expand Down Expand Up @@ -301,6 +305,31 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<(
"detached review should run on a different thread"
);

let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
let notification = loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let message = timeout(remaining, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notification) = message else {
continue;
};
if notification.method == "thread/status/changed" {
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(notification.params.expect("params must be present"))?;
if status_changed.thread_id == review_thread_id {
anyhow::bail!(
"detached review threads should be introduced without a preceding thread/status/changed"
);
}
continue;
}
if notification.method == "thread/started" {
break notification;
}
};
let started: ThreadStartedNotification =
serde_json::from_value(notification.params.expect("params must be present"))?;
assert_eq!(started.thread.id, review_thread_id);

Ok(())
}

Expand Down Expand Up @@ -389,6 +418,11 @@ async fn start_default_thread(mcp: &mut McpProcess) -> Result<String> {
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
Ok(thread.id)
}

Expand Down
29 changes: 23 additions & 6 deletions codex-rs/app-server/tests/suite/v2/thread_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
Expand All @@ -15,6 +15,7 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -124,11 +125,27 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
}

// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
let notif = loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let message = timeout(remaining, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notif) = message else {
continue;
};
if notif.method == "thread/status/changed" {
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
if status_changed.thread_id == thread.id {
anyhow::bail!(
"thread/fork should introduce the thread without a preceding thread/status/changed"
);
}
continue;
}
if notif.method == "thread/started" {
break notif;
}
};
let started_params = notif.params.clone().expect("params must be present");
let started_thread_json = started_params
.get("thread")
Expand Down
30 changes: 24 additions & 6 deletions codex-rs/app-server/tests/suite/v2/thread_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_core::config::set_project_trust_level;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::openai_models::ReasoningEffort;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use tempfile::TempDir;
Expand Down Expand Up @@ -83,11 +85,27 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
assert_eq!(thread.name, None);

// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
let notif = loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let message = timeout(remaining, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notif) = message else {
continue;
};
if notif.method == "thread/status/changed" {
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
if status_changed.thread_id == thread.id {
anyhow::bail!(
"thread/start should introduce the thread without a preceding thread/status/changed"
);
}
continue;
}
if notif.method == "thread/started" {
break notif;
}
};
let started_params = notif.params.clone().expect("params must be present");
let started_thread_json = started_params
.get("thread")
Expand Down
Loading