From ad8b616536f4e8b54d65237c8eab575924b807e5 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Fri, 27 Feb 2026 16:28:11 -0800 Subject: [PATCH] app-server: Silence thread status changes caused by thread being created --- codex-rs/app-server/README.md | 7 ++- .../app-server/src/codex_message_processor.rs | 6 +- codex-rs/app-server/src/thread_status.rs | 63 +++++++++++++++++-- codex-rs/app-server/tests/suite/v2/review.rs | 34 ++++++++++ .../app-server/tests/suite/v2/thread_fork.rs | 29 +++++++-- .../app-server/tests/suite/v2/thread_start.rs | 30 +++++++-- 6 files changed, 147 insertions(+), 22 deletions(-) diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index b7341801de9..ca1798e2d8e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -119,9 +119,9 @@ Example with notification opt-out: ## API Overview -- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread. +- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for the new thread. - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. @@ -272,10 +272,11 @@ When `nextCursor` is `null`, you’ve reached the final page. ### Example: Track thread status changes -`thread/status/changed` is emitted whenever a loaded thread's status changes: +`thread/status/changed` is emitted whenever a loaded thread's status changes after it has already been introduced to the client: - Includes `threadId` and the new `status`. - Status can be `notLoaded`, `idle`, `systemError`, or `active` (with `activeFlags`; `active` implies running). +- `thread/start`, `thread/fork`, and detached review threads do not emit a separate initial `thread/status/changed`; their `thread/started` notification already carries the current `thread.status`. ```json { "method": "thread/status/changed", "params": { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 959cc925209..215482a3c84 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2142,7 +2142,7 @@ impl CodexMessageProcessor { } self.thread_watch_manager - .upsert_thread(thread.clone()) + .upsert_thread_silently(thread.clone()) .await; thread.status = resolve_thread_status( @@ -3596,7 +3596,7 @@ impl CodexMessageProcessor { } self.thread_watch_manager - .upsert_thread(thread.clone()) + .upsert_thread_silently(thread.clone()) .await; thread.status = resolve_thread_status( @@ -6176,7 +6176,7 @@ impl CodexMessageProcessor { Ok(summary) => { let mut thread = summary_to_thread(summary); self.thread_watch_manager - .upsert_thread(thread.clone()) + .upsert_thread_silently(thread.clone()) .await; thread.status = resolve_thread_status( self.thread_watch_manager diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index f3a7c1fd83a..38122789f01 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -91,8 +91,11 @@ impl ThreadWatchManager { } pub(crate) async fn upsert_thread(&self, thread: Thread) { - self.mutate_and_publish(move |state| state.upsert_thread(thread.id)) - .await; + self.upsert_thread_with_notification(thread, true).await; + } + + pub(crate) async fn upsert_thread_silently(&self, thread: Thread) { + self.upsert_thread_with_notification(thread, false).await; } pub(crate) async fn remove_thread(&self, thread_id: &str) { @@ -256,6 +259,11 @@ impl ThreadWatchManager { .await; } + async fn upsert_thread_with_notification(&self, thread: Thread, emit_notification: bool) { + self.mutate_and_publish(move |state| state.upsert_thread(thread.id, emit_notification)) + .await; + } + fn pending_counter( runtime: &mut RuntimeFacts, guard_type: ThreadWatchActiveGuardType, @@ -289,14 +297,22 @@ struct ThreadWatchState { } impl ThreadWatchState { - fn upsert_thread(&mut self, thread_id: String) -> Option { + fn upsert_thread( + &mut self, + thread_id: String, + emit_notification: bool, + ) -> Option { let previous_status = self.status_for(&thread_id); let runtime = self .runtime_by_thread_id .entry(thread_id.clone()) .or_default(); runtime.is_loaded = true; - self.status_changed_notification(thread_id, previous_status) + if emit_notification { + self.status_changed_notification(thread_id, previous_status) + } else { + None + } } fn remove_thread(&mut self, thread_id: &str) -> Option { @@ -692,6 +708,45 @@ mod tests { ); } + #[tokio::test] + async fn silent_upsert_skips_initial_notification() { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); + let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( + outgoing_tx, + ))); + + manager + .upsert_thread_silently(test_thread( + INTERACTIVE_THREAD_ID, + codex_app_server_protocol::SessionSource::Cli, + )) + .await; + + assert_eq!( + manager + .loaded_status_for_thread(INTERACTIVE_THREAD_ID) + .await, + ThreadStatus::Idle, + ); + assert!( + timeout(Duration::from_millis(100), outgoing_rx.recv()) + .await + .is_err(), + "silent upsert should not emit thread/status/changed" + ); + + manager.note_turn_started(INTERACTIVE_THREAD_ID).await; + assert_eq!( + recv_status_changed_notification(&mut outgoing_rx).await, + ThreadStatusChangedNotification { + thread_id: INTERACTIVE_THREAD_ID.to_string(), + status: ThreadStatus::Active { + active_flags: vec![], + }, + }, + ); + } + async fn wait_for_status( manager: &ThreadWatchManager, thread_id: &str, diff --git a/codex-rs/app-server/tests/suite/v2/review.rs b/codex-rs/app-server/tests/suite/v2/review.rs index 262b13969e7..6febbaf52ac 100644 --- a/codex-rs/app-server/tests/suite/v2/review.rs +++ b/codex-rs/app-server/tests/suite/v2/review.rs @@ -8,6 +8,7 @@ use app_test_support::to_response; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -19,9 +20,12 @@ use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStartedNotification; +use codex_app_server_protocol::ThreadStatusChangedNotification; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; +use pretty_assertions::assert_eq; use serde_json::json; use tempfile::TempDir; use tokio::time::timeout; @@ -301,6 +305,31 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<( "detached review should run on a different thread" ); + let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT; + let notification = loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let message = timeout(remaining, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + if notification.method == "thread/status/changed" { + let status_changed: ThreadStatusChangedNotification = + serde_json::from_value(notification.params.expect("params must be present"))?; + if status_changed.thread_id == review_thread_id { + anyhow::bail!( + "detached review threads should be introduced without a preceding thread/status/changed" + ); + } + continue; + } + if notification.method == "thread/started" { + break notification; + } + }; + let started: ThreadStartedNotification = + serde_json::from_value(notification.params.expect("params must be present"))?; + assert_eq!(started.thread.id, review_thread_id); + Ok(()) } @@ -389,6 +418,11 @@ async fn start_default_thread(mcp: &mut McpProcess) -> Result { ) .await??; let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/started"), + ) + .await??; Ok(thread.id) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index a6f9942171f..1f19ae8b975 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -4,7 +4,7 @@ use app_test_support::create_fake_rollout; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SessionSource; @@ -15,6 +15,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadStatusChangedNotification; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use pretty_assertions::assert_eq; @@ -124,11 +125,27 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { } // A corresponding thread/started notification should arrive. - let notif: JSONRPCNotification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("thread/started"), - ) - .await??; + let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT; + let notif = loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let message = timeout(remaining, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notif) = message else { + continue; + }; + if notif.method == "thread/status/changed" { + let status_changed: ThreadStatusChangedNotification = + serde_json::from_value(notif.params.expect("params must be present"))?; + if status_changed.thread_id == thread.id { + anyhow::bail!( + "thread/fork should introduce the thread without a preceding thread/status/changed" + ); + } + continue; + } + if notif.method == "thread/started" { + break notif; + } + }; let started_params = notif.params.clone().expect("params must be present"); let started_thread_json = started_params .get("thread") diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index c0b84fe42f5..8525f792043 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -3,16 +3,18 @@ use app_test_support::McpProcess; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadStatusChangedNotification; use codex_core::config::set_project_trust_level; use codex_protocol::config_types::TrustLevel; use codex_protocol::openai_models::ReasoningEffort; +use pretty_assertions::assert_eq; use serde_json::Value; use std::path::Path; use tempfile::TempDir; @@ -83,11 +85,27 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { assert_eq!(thread.name, None); // A corresponding thread/started notification should arrive. - let notif: JSONRPCNotification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("thread/started"), - ) - .await??; + let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT; + let notif = loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let message = timeout(remaining, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notif) = message else { + continue; + }; + if notif.method == "thread/status/changed" { + let status_changed: ThreadStatusChangedNotification = + serde_json::from_value(notif.params.expect("params must be present"))?; + if status_changed.thread_id == thread.id { + anyhow::bail!( + "thread/start should introduce the thread without a preceding thread/status/changed" + ); + } + continue; + } + if notif.method == "thread/started" { + break notif; + } + }; let started_params = notif.params.clone().expect("params must be present"); let started_thread_json = started_params .get("thread")