Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
use crate::shell_snapshot::ShellSnapshot;
use crate::state_db;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
Expand Down Expand Up @@ -83,6 +84,9 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let inherited_shell_snapshot = self
.inherited_shell_snapshot_for_source(&state, session_source.as_ref())
.await;
let session_source = match session_source {
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
Expand Down Expand Up @@ -161,6 +165,7 @@ impl AgentControl {
self.clone(),
session_source,
false,
inherited_shell_snapshot,
)
.await?
} else {
Expand All @@ -171,6 +176,7 @@ impl AgentControl {
session_source,
false,
None,
inherited_shell_snapshot,
)
.await?
}
Expand Down Expand Up @@ -235,6 +241,9 @@ impl AgentControl {
other => other,
};
let notification_source = session_source.clone();
let inherited_shell_snapshot = self
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
.await;
let rollout_path =
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
Expand All @@ -246,6 +255,7 @@ impl AgentControl {
rollout_path,
self.clone(),
session_source,
inherited_shell_snapshot,
)
.await?;
reservation.commit(resumed_thread.thread_id);
Expand Down Expand Up @@ -431,6 +441,22 @@ impl AgentControl {
.upgrade()
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
}

async fn inherited_shell_snapshot_for_source(
&self,
state: &Arc<ThreadManagerState>,
session_source: Option<&SessionSource>,
) -> Option<Arc<ShellSnapshot>> {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return None;
};

let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
parent_thread.codex.session.user_shell().shell_snapshot()
}
}
#[cfg(test)]
mod tests {
Expand Down
63 changes: 53 additions & 10 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl Codex {
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
Expand Down Expand Up @@ -447,6 +448,7 @@ impl Codex {
session_source,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
};

// Generate a unique ID for the lifetime of this Codex session.
Expand Down Expand Up @@ -832,6 +834,7 @@ pub(crate) struct SessionConfiguration {
session_source: SessionSource,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
}

impl SessionConfiguration {
Expand Down Expand Up @@ -1345,13 +1348,19 @@ impl Session {
};
// Create the mutable state for the Session.
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
conversation_id,
session_configuration.cwd.clone(),
&mut default_shell,
otel_manager.clone(),
)
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
let (tx, rx) = watch::channel(Some(snapshot));
default_shell.shell_snapshot = rx;
tx
} else {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
conversation_id,
session_configuration.cwd.clone(),
&mut default_shell,
otel_manager.clone(),
)
}
} else {
let (tx, rx) = watch::channel(None);
default_shell.shell_snapshot = rx;
Expand Down Expand Up @@ -2062,6 +2071,7 @@ impl Session {
previous_cwd: &Path,
next_cwd: &Path,
codex_home: &Path,
session_source: &SessionSource,
) {
if previous_cwd == next_cwd {
return;
Expand All @@ -2071,6 +2081,13 @@ impl Session {
return;
}

if matches!(
session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
return;
}

ShellSnapshot::refresh_snapshot(
codex_home.to_path_buf(),
self.conversation_id,
Expand All @@ -2092,10 +2109,16 @@ impl Session {
let previous_cwd = state.session_configuration.cwd.clone();
let next_cwd = updated.cwd.clone();
let codex_home = updated.codex_home.clone();
let session_source = updated.session_source.clone();
state.session_configuration = updated;
drop(state);

self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &codex_home);
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&next_cwd,
&codex_home,
&session_source,
);

Ok(())
}
Expand All @@ -2111,16 +2134,29 @@ impl Session {
sub_id: String,
updates: SessionSettingsUpdate,
) -> ConstraintResult<Arc<TurnContext>> {
let (session_configuration, sandbox_policy_changed, previous_cwd, codex_home) = {
let (
session_configuration,
sandbox_policy_changed,
previous_cwd,
codex_home,
session_source,
) = {
let mut state = self.state.lock().await;
match state.session_configuration.clone().apply(&updates) {
Ok(next) => {
let previous_cwd = state.session_configuration.cwd.clone();
let sandbox_policy_changed =
state.session_configuration.sandbox_policy != next.sandbox_policy;
let codex_home = next.codex_home.clone();
let session_source = next.session_source.clone();
state.session_configuration = next.clone();
(next, sandbox_policy_changed, previous_cwd, codex_home)
(
next,
sandbox_policy_changed,
previous_cwd,
codex_home,
session_source,
)
}
Err(err) => {
drop(state);
Expand All @@ -2141,6 +2177,7 @@ impl Session {
&previous_cwd,
&session_configuration.cwd,
&codex_home,
&session_source,
);

Ok(self
Expand Down Expand Up @@ -7922,6 +7959,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};

let mut state = SessionState::new(session_configuration);
Expand Down Expand Up @@ -8015,6 +8053,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};

let mut state = SessionState::new(session_configuration);
Expand Down Expand Up @@ -8327,6 +8366,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
}
}

Expand Down Expand Up @@ -8381,6 +8421,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};

let (tx_event, _rx_event) = async_channel::unbounded();
Expand Down Expand Up @@ -8463,6 +8504,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(
Expand Down Expand Up @@ -8623,6 +8665,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools,
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub(crate) async fn run_codex_thread_interactive(
Vec::new(),
false,
None,
None,
)
.await?;
let codex = Arc::new(codex);
Expand Down
11 changes: 11 additions & 0 deletions codex-rs/core/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillsManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
Expand Down Expand Up @@ -453,6 +454,7 @@ impl ThreadManagerState {
self.session_source.clone(),
false,
None,
None,
)
.await
}
Expand All @@ -464,6 +466,7 @@ impl ThreadManagerState {
session_source: SessionSource,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
Expand All @@ -474,6 +477,7 @@ impl ThreadManagerState {
Vec::new(),
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
)
.await
}
Expand All @@ -484,6 +488,7 @@ impl ThreadManagerState {
rollout_path: PathBuf,
agent_control: AgentControl,
session_source: SessionSource,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.spawn_thread_with_source(
Expand All @@ -495,6 +500,7 @@ impl ThreadManagerState {
Vec::new(),
false,
None,
inherited_shell_snapshot,
)
.await
}
Expand All @@ -506,6 +512,7 @@ impl ThreadManagerState {
agent_control: AgentControl,
session_source: SessionSource,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
Expand All @@ -516,6 +523,7 @@ impl ThreadManagerState {
Vec::new(),
persist_extended_history,
None,
inherited_shell_snapshot,
)
.await
}
Expand All @@ -541,6 +549,7 @@ impl ThreadManagerState {
dynamic_tools,
persist_extended_history,
metrics_service_name,
None,
)
.await
}
Expand All @@ -556,6 +565,7 @@ impl ThreadManagerState {
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
let watch_registration = self.file_watcher.register_config(&config);
let CodexSpawnOk {
Expand All @@ -572,6 +582,7 @@ impl ThreadManagerState {
dynamic_tools,
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
)
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)
Expand Down
Loading