Skip to content

Commit 55fc075

Browse files
authored
Send events to realtime api (#12423)
- Send assistant messages, ExecCommandBegin, and PatchApplyBegin/PatchApplyEnd
1 parent 85b00ae commit 55fc075

File tree

2 files changed

+146
-0
lines changed

2 files changed

+146
-0
lines changed

codex-rs/core/src/codex.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ use crate::protocol::ModelRerouteEvent;
184184
use crate::protocol::ModelRerouteReason;
185185
use crate::protocol::NetworkApprovalContext;
186186
use crate::protocol::Op;
187+
use crate::protocol::PatchApplyStatus;
187188
use crate::protocol::PlanDeltaEvent;
188189
use crate::protocol::RateLimitSnapshot;
189190
use crate::protocol::ReasoningContentDeltaEvent;
@@ -2206,6 +2207,8 @@ impl Session {
22062207
msg,
22072208
};
22082209
self.send_event_raw(event).await;
2210+
self.maybe_mirror_event_text_to_realtime(&legacy_source)
2211+
.await;
22092212

22102213
let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
22112214
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
@@ -2217,6 +2220,18 @@ impl Session {
22172220
}
22182221
}
22192222

2223+
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
2224+
let Some(text) = realtime_text_for_event(msg) else {
2225+
return;
2226+
};
2227+
if self.conversation.running_state().await.is_none() {
2228+
return;
2229+
}
2230+
if let Err(err) = self.conversation.text_in(text).await {
2231+
debug!("failed to mirror event text to realtime conversation: {err}");
2232+
}
2233+
}
2234+
22202235
pub(crate) async fn send_event_raw(&self, event: Event) {
22212236
// Record the last known agent status.
22222237
if let Some(status) = agent_status_from_event(&event.msg) {
@@ -5451,6 +5466,56 @@ fn agent_message_text(item: &codex_protocol::items::AgentMessageItem) -> String
54515466
.collect()
54525467
}
54535468

5469+
fn realtime_text_for_event(msg: &EventMsg) -> Option<String> {
5470+
match msg {
5471+
EventMsg::AgentMessage(event) => Some(event.message.clone()),
5472+
EventMsg::ItemCompleted(event) => match &event.item {
5473+
TurnItem::AgentMessage(item) => Some(agent_message_text(item)),
5474+
_ => None,
5475+
},
5476+
EventMsg::ExecCommandBegin(event) => {
5477+
let command = event.command.join(" ");
5478+
Some(format!(
5479+
"Exec command started: {command}\nWorking directory: {}",
5480+
event.cwd.display()
5481+
))
5482+
}
5483+
EventMsg::PatchApplyBegin(event) => {
5484+
let mut files: Vec<String> = event
5485+
.changes
5486+
.keys()
5487+
.map(|path| path.display().to_string())
5488+
.collect();
5489+
files.sort();
5490+
let file_list = if files.is_empty() {
5491+
"none".to_string()
5492+
} else {
5493+
files.join(", ")
5494+
};
5495+
Some(format!(
5496+
"apply_patch started ({count} file change(s))\nFiles: {file_list}",
5497+
count = files.len()
5498+
))
5499+
}
5500+
EventMsg::PatchApplyEnd(event) => {
5501+
let status = match event.status {
5502+
PatchApplyStatus::Completed => "completed",
5503+
PatchApplyStatus::Failed => "failed",
5504+
PatchApplyStatus::Declined => "declined",
5505+
};
5506+
let mut text = format!("apply_patch {status}");
5507+
if !event.stdout.is_empty() {
5508+
text.push_str(&format!("\nstdout:\n{}", event.stdout));
5509+
}
5510+
if !event.stderr.is_empty() {
5511+
text.push_str(&format!("\nstderr:\n{}", event.stderr));
5512+
}
5513+
Some(text)
5514+
}
5515+
_ => None,
5516+
}
5517+
}
5518+
54545519
/// Split the stream into normal assistant text vs. proposed plan content.
54555520
/// Normal text becomes AgentMessage deltas; plan content becomes PlanDelta +
54565521
/// TurnItem::Plan.

codex-rs/core/tests/suite/realtime_conversation.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use codex_protocol::protocol::Op;
99
use codex_protocol::protocol::RealtimeAudioFrame;
1010
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
1111
use codex_protocol::protocol::RealtimeEvent;
12+
use core_test_support::responses;
13+
use core_test_support::responses::start_mock_server;
1214
use core_test_support::responses::start_websocket_server;
1315
use core_test_support::skip_if_no_network;
1416
use core_test_support::test_codex::test_codex;
@@ -459,3 +461,82 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
459461
server.shutdown().await;
460462
Ok(())
461463
}
464+
465+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
466+
async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> Result<()> {
467+
skip_if_no_network!(Ok(()));
468+
469+
let api_server = start_mock_server().await;
470+
let _response_mock = responses::mount_sse_once(
471+
&api_server,
472+
responses::sse(vec![
473+
responses::ev_response_created("resp_1"),
474+
responses::ev_assistant_message("msg_1", "assistant says hi"),
475+
responses::ev_completed("resp_1"),
476+
]),
477+
)
478+
.await;
479+
480+
let realtime_server = start_websocket_server(vec![vec![
481+
vec![json!({
482+
"type": "session.created",
483+
"session": { "id": "sess_1" }
484+
})],
485+
vec![],
486+
]])
487+
.await;
488+
489+
let mut builder = test_codex().with_config({
490+
let realtime_base_url = realtime_server.uri().to_string();
491+
move |config| {
492+
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
493+
}
494+
});
495+
let test = builder.build(&api_server).await?;
496+
497+
test.codex
498+
.submit(Op::RealtimeConversationStart(ConversationStartParams {
499+
prompt: "backend prompt".to_string(),
500+
session_id: None,
501+
}))
502+
.await?;
503+
504+
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
505+
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
506+
payload: RealtimeEvent::SessionCreated { session_id },
507+
}) => Some(session_id.clone()),
508+
_ => None,
509+
})
510+
.await;
511+
assert_eq!(session_created, "sess_1");
512+
513+
test.submit_turn("hello").await?;
514+
515+
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
516+
while tokio::time::Instant::now() < deadline {
517+
let connections = realtime_server.connections();
518+
if connections.len() == 1 && connections[0].len() >= 2 {
519+
break;
520+
}
521+
tokio::time::sleep(Duration::from_millis(10)).await;
522+
}
523+
524+
let realtime_connections = realtime_server.connections();
525+
assert_eq!(realtime_connections.len(), 1);
526+
assert_eq!(realtime_connections[0].len(), 2);
527+
assert_eq!(
528+
realtime_connections[0][0].body_json()["type"].as_str(),
529+
Some("session.create")
530+
);
531+
assert_eq!(
532+
realtime_connections[0][1].body_json()["type"].as_str(),
533+
Some("conversation.item.create")
534+
);
535+
assert_eq!(
536+
realtime_connections[0][1].body_json()["item"]["content"][0]["text"].as_str(),
537+
Some("assistant says hi")
538+
);
539+
540+
realtime_server.shutdown().await;
541+
Ok(())
542+
}

0 commit comments

Comments
 (0)