From 2367b3c16d679341598ce7e7d69d796dad4d61a4 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 12:03:59 -0700 Subject: [PATCH 01/14] Add non-interrupting active fork mode --- .../schema/json/ClientRequest.json | 7 + .../codex_app_server_protocol.schemas.json | 7 + .../codex_app_server_protocol.v2.schemas.json | 7 + .../schema/json/v2/ThreadForkParams.json | 7 + .../typescript/v2/ThreadForkActiveMode.ts | 5 + .../schema/typescript/v2/index.ts | 1 + .../src/protocol/thread_history.rs | 3 +- .../src/protocol/v2/thread.rs | 18 ++ codex-rs/app-server/README.md | 4 +- .../request_processors/thread_processor.rs | 32 ++- .../app-server/tests/suite/v2/thread_fork.rs | 197 ++++++++++++++++++ codex-rs/core/src/agent/control/spawn.rs | 5 +- codex-rs/core/src/agent/control_tests.rs | 1 + .../src/session/rollout_reconstruction.rs | 7 +- codex-rs/core/src/session/tests.rs | 2 + codex-rs/core/src/session/turn.rs | 7 + codex-rs/core/src/thread_manager.rs | 108 ++++++++-- codex-rs/core/src/thread_manager_tests.rs | 197 ++++++++++++++++++ .../core/src/thread_rollout_truncation.rs | 44 ++++ .../src/thread_rollout_truncation_tests.rs | 41 ++++ codex-rs/memories/write/src/phase1.rs | 1 + codex-rs/protocol/src/protocol.rs | 9 + codex-rs/rollout/src/list.rs | 2 + codex-rs/rollout/src/metadata.rs | 2 + codex-rs/rollout/src/persistence_metrics.rs | 1 + codex-rs/rollout/src/policy.rs | 3 +- codex-rs/rollout/src/recorder.rs | 1 + codex-rs/rollout/src/search.rs | 3 +- codex-rs/state/src/extract.rs | 4 +- codex-rs/state/src/runtime/threads.rs | 1 + .../thread-store/src/thread_metadata_sync.rs | 3 +- 31 files changed, 689 insertions(+), 41 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index f3983e31f41c..4e2b14bd6a5b 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3585,6 +3585,13 @@ ], "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index b77b512b36e3..48f8c0c0b4e2 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17232,6 +17232,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "$schema": "http://json-schema.org/draft-07/schema#", "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index bdd61d6b9436..6909545a996c 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15011,6 +15011,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "$schema": "http://json-schema.org/draft-07/schema#", "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index 76278b106ba3..b4560de465e5 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -71,6 +71,13 @@ ], "type": "string" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadSource": { "type": "string" } diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts new file mode 100644 index 000000000000..02c07db67437 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadForkActiveMode = "interrupt" | "nonInterrupting"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index c405082c4c8f..d4e07a8e6540 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -400,6 +400,7 @@ export type { ThreadDeleteParams } from "./ThreadDeleteParams"; export type { ThreadDeleteResponse } from "./ThreadDeleteResponse"; export type { ThreadDeletedNotification } from "./ThreadDeletedNotification"; export type { ThreadExtra } from "./ThreadExtra"; +export type { ThreadForkActiveMode } from "./ThreadForkActiveMode"; export type { ThreadForkParams } from "./ThreadForkParams"; export type { ThreadForkResponse } from "./ThreadForkResponse"; export type { ThreadGoal } from "./ThreadGoal"; diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 2d9c6b6a1a48..38a2c6892075 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -389,7 +389,8 @@ impl ThreadHistoryBuilder { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) - | RolloutItem::SessionMeta(_) => {} + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => {} } } diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 7cd03930d8a8..d6a79c3259c5 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -561,6 +561,24 @@ pub struct ThreadForkParams { #[experimental("thread/fork.excludeTurns")] #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub exclude_turns: bool, + /// How to fork a currently active source thread. + /// + /// Omitted preserves the legacy interrupted snapshot behavior. Use + /// `nonInterrupting` to fork from an existing stable source-owned sampling + /// boundary without cancelling or mutating the source thread. + #[experimental("thread/fork.activeForkMode")] + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional = nullable)] + pub active_fork_mode: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum ThreadForkActiveMode { + Interrupt, + NonInterrupting, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, ExperimentalApi)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index c9ac2f5fb27c..e9a2e3623f7a 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -139,7 +139,7 @@ Example with notification opt-out: - `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `allowProviderModelFallback` lets providers backed by an authoritative static model catalog replace an unavailable requested `model` with the catalog default; dynamic or cached catalogs preserve the requested model. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Deprecated experimental `multiAgentMode` is ignored; use Ultra reasoning effort for proactive multi-agent behavior. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots using environment-native absolute paths. Skills found below those roots are listed and read through the owning environment. Stdio MCP servers declared by selected plugins are started in that environment, and HTTP MCP connections use that environment's HTTP client. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the default active fork mode records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. Experimental clients can pass `activeForkMode: "nonInterrupting"` to fork from the latest existing stable sampling boundary without interrupting the source thread; if no boundary is available for a mid-turn source, the request fails. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. `instructionSources` lists loaded instruction files using each source environment's native absolute path syntax, including files loaded from remote environments. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. Their deprecated experimental `multiAgentMode` field, and the corresponding thread setting, always report `explicitRequestOnly`; Ultra reasoning effort is the source of proactive multi-agent behavior. - `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` for direct spawned children or `ancestorThreadId` for spawned descendants at any depth; the two filters are mutually exclusive. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known. - `thread/loaded/list` — list the thread ids currently loaded in memory. @@ -355,7 +355,7 @@ Example: } } ``` -To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. The returned `thread.sessionId` identifies the current live session tree root. Root threads use their own `thread.id` as `thread.sessionId`; stored threads that are not loaded also report their own `thread.id`, because resuming one makes it the root of a new live session tree. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only: +To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. The returned `thread.sessionId` identifies the current live session tree root. Root threads use their own `thread.id` as `thread.sessionId`; stored threads that are not loaded also report their own `thread.id`, because resuming one makes it the root of a new live session tree. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the default fork snapshots it as if the current turn had been interrupted first. Pass `activeForkMode: "nonInterrupting"` to fork from an existing stable boundary without interrupting the active source; a mid-turn source with no stable boundary returns an invalid-request error. Pass `ephemeral: true` when the fork should stay in-memory only: ```json { "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123", "ephemeral": true } } diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index b78c53a383a8..08269a9315fd 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -3404,6 +3404,7 @@ impl ThreadRequestProcessor { ephemeral, thread_source, exclude_turns, + active_fork_mode, } = params; let include_turns = !exclude_turns; if sandbox.is_some() && permissions.is_some() { @@ -3485,15 +3486,19 @@ impl ThreadRequestProcessor { let fallback_model_provider = config.model_provider_id.clone(); - let NewThread { - thread_id, - thread: forked_thread, - session_configured, - .. - } = self + let fork_snapshot = match active_fork_mode + .unwrap_or(codex_app_server_protocol::ThreadForkActiveMode::Interrupt) + { + codex_app_server_protocol::ThreadForkActiveMode::Interrupt => ForkSnapshot::Interrupted, + codex_app_server_protocol::ThreadForkActiveMode::NonInterrupting => { + ForkSnapshot::NonInterrupting + } + }; + + let forked = self .thread_manager - .fork_thread_from_history( - ForkSnapshot::Interrupted, + .fork_thread_from_history_with_selection( + fork_snapshot, config, InitialHistory::Resumed(ResumedHistory { conversation_id: source_thread_id, @@ -3512,6 +3517,13 @@ impl ThreadRequestProcessor { CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!("error forking thread: {err}")), })?; + let fork_history_items = forked.selected_history; + let NewThread { + thread_id, + thread: forked_thread, + session_configured, + .. + } = forked.new_thread; Self::set_app_server_client_info( forked_thread.as_ref(), @@ -3569,12 +3581,12 @@ impl ThreadRequestProcessor { &config_snapshot, /*path*/ None, ); - thread.preview = preview_from_rollout_items(&history_items); + thread.preview = preview_from_rollout_items(&fork_history_items); thread.forked_from_id = Some(source_thread_id.to_string()); if include_turns { populate_thread_turns_from_history( &mut thread, - &history_items, + &fork_history_items, /*active_turn*/ None, ); } diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index d7c1707a3297..57243fbdf441 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -12,6 +12,8 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::SessionSource; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadForkActiveMode; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; @@ -31,8 +33,11 @@ use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::ThreadId; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use codex_protocol::protocol::MultiAgentVersion; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_rollout::append_rollout_item_to_path; use codex_rollout::append_thread_name; use codex_rollout::read_session_meta_line; @@ -40,6 +45,7 @@ use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::path::Path; +use std::path::PathBuf; use tempfile::TempDir; use tokio::time::timeout; use wiremock::Mock; @@ -83,6 +89,52 @@ async fn list_threads(mcp: &mut TestAppServer) -> Result { to_response::(list_resp) } +fn rollout_path(codex_home: &Path, timestamp: &str, conversation_id: &str) -> PathBuf { + codex_home + .join("sessions") + .join("2025") + .join("01") + .join("05") + .join(format!("rollout-{timestamp}-{conversation_id}.jsonl")) +} + +fn assistant_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + phase: None, + internal_chat_message_metadata_passthrough: None, + } +} + +async fn append_sampling_boundary(path: &Path) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: "turn-1".to_string(), + window_id: "window-1".to_string(), + }), + ) + .await?; + Ok(()) +} + +async fn append_partial_assistant_output(path: &Path, text: &str) -> Result<()> { + append_rollout_item_to_path(path, &RolloutItem::ResponseItem(assistant_message(text))).await?; + Ok(()) +} + +fn thread_contains_agent_text(thread: &Thread, needle: &str) -> bool { + thread.turns.iter().any(|turn| { + turn.items.iter().any( + |item| matches!(item, ThreadItem::AgentMessage { text, .. } if text.contains(needle)), + ) + }) +} + #[tokio::test] async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -459,6 +511,151 @@ async fn thread_fork_can_load_source_by_path() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_fork_non_interrupting_excludes_partial_output() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let preview = "Saved user message"; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + preview, + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "partial assistant output").await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let fork_result = fork_resp.result.clone(); + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert_eq!(fork_result.get("appliedSnapshotMode"), None); + assert_eq!(fork_result.get("snapshotFallbackReason"), None); + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, preview); + assert!( + !thread_contains_agent_text(&thread, "partial assistant output"), + "non-interrupting fork must exclude assistant output after the sampling boundary" + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_fork_non_interrupting_without_boundary_fails_closed() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_partial_assistant_output(&source_path, "unmarked partial output").await?; + let source_before = std::fs::read_to_string(&source_path)?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(fork_id)), + ) + .await??; + + assert!( + fork_err + .error + .message + .contains("no stable sampling boundary"), + "unexpected fork error: {}", + fork_err.error.message + ); + assert_eq!(std::fs::read_to_string(&source_path)?, source_before); + + Ok(()) +} + +#[tokio::test] +async fn thread_fork_ephemeral_non_interrupting_uses_selected_history() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let preview = "Saved user message"; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + preview, + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "ephemeral partial output").await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + ephemeral: true, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert!(thread.ephemeral); + assert_eq!(thread.path, None); + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, preview); + assert!( + !thread_contains_agent_text(&thread, "ephemeral partial output"), + "ephemeral fork response must use selected history, not raw source history" + ); + + Ok(()) +} + #[tokio::test] async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/agent/control/spawn.rs b/codex-rs/core/src/agent/control/spawn.rs index c4c40577ca4f..11dbb9bc1d11 100644 --- a/codex-rs/core/src/agent/control/spawn.rs +++ b/codex-rs/core/src/agent/control/spawn.rs @@ -63,7 +63,10 @@ fn keep_forked_rollout_item(item: &RolloutItem, preserve_reference_context_item: // from the parent's durable baseline. Truncated forks drop part of that prompt, // so they must rebuild context on their first child turn. RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) => preserve_reference_context_item, - RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => true, + RolloutItem::Compacted(_) + | RolloutItem::EventMsg(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => true, } } diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 9863bfb7e24c..413e6317a70a 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -172,6 +172,7 @@ async fn persisted_originator(thread: &CodexThread) -> String { | RolloutItem::EventMsg(_) | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::TurnContext(_) => None, }) .expect("session metadata should be persisted") diff --git a/codex-rs/core/src/session/rollout_reconstruction.rs b/codex-rs/core/src/session/rollout_reconstruction.rs index 0b89caaf0575..955adc38f83f 100644 --- a/codex-rs/core/src/session/rollout_reconstruction.rs +++ b/codex-rs/core/src/session/rollout_reconstruction.rs @@ -280,7 +280,8 @@ impl Session { } RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) - | RolloutItem::InterAgentCommunicationMetadata { .. } => {} + | RolloutItem::InterAgentCommunicationMetadata { .. } + | RolloutItem::SamplingBoundary(_) => {} } if base_replacement_history.is_some() @@ -337,7 +338,8 @@ impl Session { turn_context.model_info.truncation_policy.into(), ); } - RolloutItem::InterAgentCommunicationMetadata { .. } => {} + RolloutItem::InterAgentCommunicationMetadata { .. } + | RolloutItem::SamplingBoundary(_) => {} RolloutItem::Compacted(compacted) => { if let Some(replacement_history) = &compacted.replacement_history { // This should actually never happen, because the reverse loop above (to build rollout_suffix) @@ -415,6 +417,7 @@ impl Session { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => { unreachable!("only world-state replay items are collected") } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index ece9d446b57e..81ea464a3e4c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2758,6 +2758,7 @@ async fn start_new_context_window_assigns_and_persists_item_ids() { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }); assert_eq!( @@ -2817,6 +2818,7 @@ async fn record_initial_history_assigns_and_persists_id_for_forked_response_item | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }); assert_eq!(persisted_item_id, Some(live_item_id.as_str())); diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index f16b525002fc..9691e165388b 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -101,7 +101,9 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::PlanDeltaEvent; use codex_protocol::protocol::ReasoningContentDeltaEvent; use codex_protocol::protocol::ReasoningRawContentDeltaEvent; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SafetyBufferingEvent; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::TurnDiffEvent; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; @@ -275,6 +277,11 @@ pub(crate) async fn run_turn( } .instrument(trace_span!("run_turn.prepare_sampling_request_input")) .await; + sess.persist_rollout_items(&[RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_context.sub_id.clone(), + window_id: window_id.clone(), + })]) + .await; let responses_metadata = turn_context.turn_metadata_state.to_responses_metadata( sess.installation_id.clone(), diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 4ea86cb33d00..0c2fe26ac0ab 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -125,15 +125,6 @@ pub struct NewThread { pub session_configured: SessionConfiguredEvent, } -// TODO(ccunningham): Add an explicit non-interrupting live-turn snapshot once -// core can represent sampling boundaries directly instead of relying on -// whichever items happened to be persisted mid-turn. -// -// Two likely future variants: -// - `TruncateToLastSamplingBoundary` for callers that want a coherent fork from -// the last stable model boundary without synthesizing an interrupt. -// - `WaitUntilNextSamplingBoundary` (or similar) for callers that prefer to -// fork after the next sampling boundary rather than interrupting immediately. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ForkSnapshot { /// Fork a committed prefix ending strictly before the nth user message. @@ -154,6 +145,18 @@ pub enum ForkSnapshot { /// already at a turn boundary, this returns the current persisted history /// unchanged. Interrupted, + + /// Fork without interrupting the source thread. + /// + /// Mid-turn sources use the latest persisted sampling boundary. If no + /// boundary exists, this fails closed instead of falling back to an + /// interrupted snapshot. + NonInterrupting, +} + +pub struct ForkedThread { + pub new_thread: NewThread, + pub selected_history: Vec, } /// Preserve legacy `fork_thread(usize, ...)` callsites by mapping them to the @@ -738,7 +741,7 @@ impl ThreadManager { &options.config, inherited_multi_agent_version, ), - ); + )?; self.start_thread_with_options_and_fork_source(options, Some(forked_from_thread_id)) .await } @@ -988,6 +991,31 @@ impl ThreadManager { parent_trace: Option, supports_openai_form_elicitation: bool, ) -> CodexResult + where + S: Into, + { + Ok(self + .fork_thread_from_history_with_selection( + snapshot, + config, + history, + thread_source, + parent_trace, + supports_openai_form_elicitation, + ) + .await? + .new_thread) + } + + pub async fn fork_thread_from_history_with_selection( + &self, + snapshot: S, + config: Config, + history: InitialHistory, + thread_source: Option, + parent_trace: Option, + supports_openai_form_elicitation: bool, + ) -> CodexResult where S: Into, { @@ -1010,7 +1038,7 @@ impl ThreadManager { thread_source: Option, parent_trace: Option, supports_openai_form_elicitation: bool, - ) -> CodexResult { + ) -> CodexResult { // `forked_from_id()` describes this history's existing lineage. When // forking a resumed thread, the child copies the resumed thread itself. let source_thread_id = match &history { @@ -1030,13 +1058,14 @@ impl ThreadManager { .await; let interrupted_marker = InterruptedTurnHistoryMarker::from_config_and_version(&config, multi_agent_version); - let history = fork_history_from_snapshot(snapshot, history, interrupted_marker); + let history = fork_history_from_snapshot(snapshot, history, interrupted_marker)?; + let selected_history = history.get_rollout_items().to_vec(); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, ); let agent_control = self.agent_control_for_config(&config); - Box::pin(self.state.spawn_thread( + let new_thread = Box::pin(self.state.spawn_thread( config, history, Arc::clone(&self.state.auth_manager), @@ -1052,7 +1081,11 @@ impl ThreadManager { supports_openai_form_elicitation, /*user_shell_override*/ None, )) - .await + .await?; + Ok(ForkedThread { + new_thread, + selected_history, + }) } pub(crate) fn agent_control(&self) -> AgentControl { @@ -1832,12 +1865,12 @@ fn fork_history_from_snapshot( snapshot: ForkSnapshot, history: InitialHistory, interrupted_marker: InterruptedTurnHistoryMarker, -) -> InitialHistory { +) -> CodexResult { let snapshot_state = snapshot_turn_state(&history); match snapshot { - ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => { - truncate_before_nth_user_message(history, nth_user_message, &snapshot_state) - } + ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => Ok( + truncate_before_nth_user_message(history, nth_user_message, &snapshot_state), + ), ForkSnapshot::Interrupted => { let history = match history { InitialHistory::New => InitialHistory::New, @@ -1848,18 +1881,51 @@ fn fork_history_from_snapshot( } }; if snapshot_state.ends_mid_turn { - append_interrupted_boundary( + Ok(append_interrupted_boundary( history, snapshot_state.active_turn_id, interrupted_marker, - ) + )) } else { - history + Ok(history) } } + ForkSnapshot::NonInterrupting => { + truncate_to_last_sampling_boundary(history, &snapshot_state) + } } } +fn truncate_to_last_sampling_boundary( + history: InitialHistory, + snapshot_state: &SnapshotTurnState, +) -> CodexResult { + if !snapshot_state.ends_mid_turn { + return Ok(match history { + InitialHistory::New => InitialHistory::New, + InitialHistory::Cleared => InitialHistory::Cleared, + InitialHistory::Forked(history) => InitialHistory::Forked(history), + InitialHistory::Resumed(resumed) => { + InitialHistory::Forked(Arc::unwrap_or_clone(resumed.history)) + } + }); + } + + let items = history.get_rollout_items(); + let Some(last_boundary_position) = truncation::sampling_boundary_positions_in_rollout(items) + .last() + .copied() + else { + return Err(CodexErr::InvalidRequest( + "cannot fork active thread without interrupting: no stable sampling boundary is available" + .to_string(), + )); + }; + Ok(InitialHistory::Forked( + items[..=last_boundary_position].to_vec(), + )) +} + /// Append the same persisted interrupt boundary used by the live interrupt path /// to an existing fork snapshot after the source thread has been confirmed to /// be mid-turn. diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 4f0da0c9c7cf..82ddd9c158ec 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -20,6 +20,7 @@ use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; use codex_protocol::protocol::ResumedHistory; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; @@ -113,6 +114,13 @@ fn developer_interrupted_marker() -> ResponseItem { .expect("developer interrupted marker should be enabled") } +fn sampling_boundary(turn_id: &str, window_id: &str) -> RolloutItem { + RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_id.to_string(), + window_id: window_id.to_string(), + }) +} + #[test] fn effective_originator_prefers_thread_scoped_sources_before_env_originator() { for (metrics_service_name, persisted_originator, inherited_originator, expected_originator) in [ @@ -1878,3 +1886,192 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ 1, ); } + +#[tokio::test] +async fn non_interrupting_fork_snapshot_uses_stable_boundaries_and_fails_closed() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let state_db = init_state_db(&config).await; + let manager = ThreadManager::new( + &config, + auth_manager.clone(), + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + empty_extension_registry(), + Arc::new(crate::test_support::EmptyUserInstructionsProvider), + /*analytics_events_client*/ None, + thread_store_from_config(&config, state_db.clone()), + local_agent_graph_store_from_state_db(state_db.as_ref()), + TEST_INSTALLATION_ID.to_string(), + /*attestation_provider*/ None, + /*external_time_provider*/ None, + ); + + let marked_source_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-marked".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("marked user")), + sampling_boundary("turn-marked", "window-marked"), + RolloutItem::ResponseItem(assistant_msg("partial after boundary")), + ]; + let marked_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(marked_source_items.clone()), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create marked source thread"); + let marked_source_path = marked_source + .thread + .rollout_path() + .expect("marked source rollout path should exist"); + + let marked_fork = manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + marked_source_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + .expect("fork marked source without interrupting"); + let marked_fork_path = marked_fork + .thread + .rollout_path() + .expect("marked fork rollout path should exist"); + let marked_fork_history = RolloutRecorder::get_rollout_history(&marked_fork_path) + .await + .expect("read marked fork rollout"); + let marked_fork_items: Vec<_> = marked_fork_history + .get_rollout_items() + .iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .cloned() + .collect(); + + assert_eq!( + serde_json::to_value(&marked_fork_items).expect("serialize marked fork"), + serde_json::to_value(&marked_source_items[..3]).expect("serialize expected marked fork") + ); + assert!(!marked_fork_items.iter().any(|item| { + matches!( + item, + RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { + reason: TurnAbortReason::Interrupted, + .. + })) + ) + })); + + let unmarked_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(vec![ + RolloutItem::ResponseItem(user_msg("unmarked user")), + RolloutItem::ResponseItem(assistant_msg("unmarked partial")), + ]), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create unmarked source thread"); + let unmarked_source_path = unmarked_source + .thread + .rollout_path() + .expect("unmarked source rollout path should exist"); + let unmarked_before = + std::fs::read_to_string(&unmarked_source_path).expect("read unmarked source before fork"); + let err = match manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + unmarked_source_path.clone(), + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + { + Ok(_) => panic!("unmarked active source should fail closed"), + Err(err) => err, + }; + assert!( + matches!(err, CodexErr::InvalidRequest(message) if message.contains("no stable sampling boundary")) + ); + let unmarked_after = + std::fs::read_to_string(&unmarked_source_path).expect("read unmarked source after fork"); + assert_eq!(unmarked_after, unmarked_before); + + let inactive_source_items = vec![ + RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: "inactive user".to_string(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + ..Default::default() + })), + RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent { + message: "inactive done".to_string(), + phase: None, + memory_citation: None, + })), + ]; + let inactive_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(inactive_source_items.clone()), + auth_manager, + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create inactive source thread"); + let inactive_source_path = inactive_source + .thread + .rollout_path() + .expect("inactive source rollout path should exist"); + let inactive_fork = manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config, + inactive_source_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + .expect("fork inactive source without boundary"); + let inactive_fork_path = inactive_fork + .thread + .rollout_path() + .expect("inactive fork rollout path should exist"); + let inactive_fork_history = RolloutRecorder::get_rollout_history(&inactive_fork_path) + .await + .expect("read inactive fork rollout"); + let inactive_fork_items: Vec<_> = inactive_fork_history + .get_rollout_items() + .iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .cloned() + .collect(); + + assert_eq!( + serde_json::to_value(&inactive_fork_items).expect("serialize inactive fork"), + serde_json::to_value(&inactive_source_items).expect("serialize expected inactive fork") + ); +} diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index d7d6ad998cee..8053e616dfa2 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -125,6 +125,50 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec Vec { + let mut rollback_turn_positions = Vec::new(); + let mut boundary_positions = Vec::new(); + for (idx, item) in items.iter().enumerate() { + match item { + RolloutItem::ResponseItem(item) + if is_user_turn_boundary(item) => { + rollback_turn_positions.push(idx); + } + RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } => { + rollback_turn_positions.push(idx); + } + RolloutItem::SamplingBoundary(_) => { + boundary_positions.push(idx); + } + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { + let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX); + if num_turns == 0 { + continue; + } + let Some(rollback_start_idx) = rollback_turn_positions + .len() + .checked_sub(num_turns) + .map(|rollback_start| rollback_turn_positions[rollback_start]) + .or_else(|| rollback_turn_positions.first().copied()) + else { + continue; + }; + let new_rollback_len = rollback_turn_positions.len().saturating_sub(num_turns); + rollback_turn_positions.truncate(new_rollback_len); + boundary_positions.retain(|position| *position < rollback_start_idx); + } + _ => {} + } + } + boundary_positions +} + /// Return a prefix of `items` obtained by cutting strictly before the nth user message. /// /// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index 16d10a827728..9bc4e9e186cf 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -5,6 +5,7 @@ use codex_protocol::AgentPath; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; @@ -89,6 +90,13 @@ fn turn_completed(turn_id: &str) -> RolloutItem { })) } +fn sampling_boundary(turn_id: &str, window_id: &str) -> RolloutItem { + RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_id.to_string(), + window_id: window_id.to_string(), + }) +} + #[test] fn truncates_rollout_after_terminal_canonical_turn_id() { let rollout = vec![ @@ -443,6 +451,39 @@ fn fork_turn_positions_ignore_zero_turn_rollback_markers() { assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 1, 3]); } +#[test] +fn sampling_boundary_positions_apply_thread_rollback_markers() { + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + RolloutItem::ResponseItem(assistant_msg("a1")), + turn_started("turn-2"), + RolloutItem::ResponseItem(user_msg("u2")), + sampling_boundary("turn-2", "window-2"), + RolloutItem::ResponseItem(assistant_msg("partial")), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 1, + })), + RolloutItem::ResponseItem(user_msg("u3")), + sampling_boundary("turn-3", "window-3"), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![8]); +} + +#[test] +fn sampling_boundary_positions_ignore_zero_turn_rollback_markers() { + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + sampling_boundary("turn-1", "window-1"), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 0, + })), + RolloutItem::ResponseItem(assistant_msg("a1")), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![1]); +} + #[test] fn truncates_rollout_to_last_n_fork_turns_discards_trigger_boundaries_in_rolled_back_suffix() { let rollout = vec![ diff --git a/codex-rs/memories/write/src/phase1.rs b/codex-rs/memories/write/src/phase1.rs index 712e11d0ef87..087cf8c416f1 100644 --- a/codex-rs/memories/write/src/phase1.rs +++ b/codex-rs/memories/write/src/phase1.rs @@ -416,6 +416,7 @@ mod job { | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) .collect::>(); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 96a793bcd38f..ddf409c2b084 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2140,6 +2140,12 @@ pub struct TokenCountEvent { pub rate_limits: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema, TS)] +pub struct SamplingBoundaryEvent { + pub turn_id: String, + pub window_id: String, +} + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema, TS)] pub struct RateLimitSnapshot { pub limit_id: Option, @@ -2666,6 +2672,7 @@ impl InitialHistory { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) .and_then(|turn_context| turn_context.multi_agent_mode) @@ -3002,6 +3009,7 @@ fn multi_agent_version_from_items( | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) }) @@ -3164,6 +3172,7 @@ pub enum RolloutItem { Compacted(CompactedItem), TurnContext(TurnContextItem), WorldState(WorldStateItem), + SamplingBoundary(SamplingBoundaryEvent), EventMsg(EventMsg), } diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index 1154d36eeecc..981d400b5a92 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1173,6 +1173,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result {} + RolloutItem::SamplingBoundary(_) => {} RolloutItem::TurnContext(_) => { // Not included in `head`; skip. } @@ -1239,6 +1240,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result None, }) && let Some(builder) = builder_from_session_meta(session_meta, rollout_path) { @@ -130,6 +131,7 @@ pub async fn extract_metadata_from_rollout( | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }), parse_errors, diff --git a/codex-rs/rollout/src/persistence_metrics.rs b/codex-rs/rollout/src/persistence_metrics.rs index 8c2a7e18d238..a1dc6931957d 100644 --- a/codex-rs/rollout/src/persistence_metrics.rs +++ b/codex-rs/rollout/src/persistence_metrics.rs @@ -233,6 +233,7 @@ fn rollout_item_type(item: &RolloutItem) -> String { RolloutItem::Compacted(_) => "compacted".to_string(), RolloutItem::TurnContext(_) => "turn_context".to_string(), RolloutItem::WorldState(_) => "world_state".to_string(), + RolloutItem::SamplingBoundary(_) => "sampling_boundary".to_string(), RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => { format!("event.item_completed.{}", turn_item_type(&event.item)) } diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 2390a0fc61a1..c365b5a35d1c 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -13,7 +13,8 @@ pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool { RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) - | RolloutItem::SessionMeta(_) => true, + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => true, } } diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 87e97997a245..65c18dd0e444 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -1930,6 +1930,7 @@ async fn resume_candidate_matches_cwd( | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) { diff --git a/codex-rs/rollout/src/search.rs b/codex-rs/rollout/src/search.rs index 58b46887a98f..1ef66709a522 100644 --- a/codex-rs/rollout/src/search.rs +++ b/codex-rs/rollout/src/search.rs @@ -286,7 +286,8 @@ fn conversation_text_from_item(item: &RolloutItem) -> Option { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => None, + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => None, } } diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 48e308f627e1..4922192ef2d4 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -26,6 +26,7 @@ pub fn apply_rollout_item( | RolloutItem::InterAgentCommunicationMetadata { .. } => {} RolloutItem::Compacted(_) => {} RolloutItem::WorldState(_) => {} + RolloutItem::SamplingBoundary(_) => {} } if metadata.model_provider.is_empty() { metadata.model_provider = default_provider.to_string(); @@ -44,7 +45,8 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => false, + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => false, } } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index c80b2b671679..2a50e40de085 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1245,6 +1245,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option { | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) } diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 1057715a4a58..eeb80c18a60f 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -287,7 +287,8 @@ impl ThreadMetadataSync { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => {} + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => {} } } Some(update) From 3bff7fb3e378a248df7a16bce486814d9438cbef Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 12:50:13 -0700 Subject: [PATCH 02/14] Trigger fork CI From 8691c32999d4072a475f0c17a0c603931680253a Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 13:02:21 -0700 Subject: [PATCH 03/14] Fix non-interrupting fork boundary selection --- .../request_processors/thread_processor.rs | 2 +- .../app-server/tests/suite/v2/thread_fork.rs | 119 ++++++++++++++++++ codex-rs/core/src/thread_manager.rs | 28 ++++- codex-rs/core/src/thread_manager_tests.rs | 74 ++++++++++- .../core/src/thread_rollout_truncation.rs | 33 +++-- .../src/thread_rollout_truncation_tests.rs | 25 ++++ 6 files changed, 263 insertions(+), 18 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 08269a9315fd..1f47c7f8fb01 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -3647,7 +3647,7 @@ impl ThreadRequestProcessor { // instead of rebuilding history only to attribute a historical update. if let Some(token_usage_thread) = token_usage_thread { let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &history_items, + &fork_history_items, token_usage_thread.turns.as_slice(), ); // Mirror the resume contract for forks: the new thread is usable as soon diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 57243fbdf441..9f5221bfc3f0 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -35,9 +35,15 @@ use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::ThreadId; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::MultiAgentVersion; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SamplingBoundaryEvent; +use codex_protocol::protocol::TokenCountEvent; +use codex_protocol::protocol::TokenUsage; +use codex_protocol::protocol::TokenUsageInfo; +use codex_protocol::protocol::TurnStartedEvent; +use codex_protocol::protocol::UserMessageEvent; use codex_rollout::append_rollout_item_to_path; use codex_rollout::append_thread_name; use codex_rollout::read_session_meta_line; @@ -127,6 +133,61 @@ async fn append_partial_assistant_output(path: &Path, text: &str) -> Result<()> Ok(()) } +async fn append_active_user_turn_start(path: &Path, turn_id: &str, message: &str) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_id.to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + ) + .await?; + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: message.to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + ..Default::default() + })), + ) + .await?; + Ok(()) +} + +async fn append_token_count(path: &Path) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::TokenCount(TokenCountEvent { + info: Some(TokenUsageInfo { + total_token_usage: TokenUsage { + input_tokens: 200, + cached_input_tokens: 40, + output_tokens: 60, + reasoning_output_tokens: 20, + total_tokens: 260, + }, + last_token_usage: TokenUsage { + input_tokens: 80, + cached_input_tokens: 15, + output_tokens: 40, + reasoning_output_tokens: 10, + total_tokens: 120, + }, + model_context_window: Some(200_000), + }), + rate_limits: None, + })), + ) + .await?; + Ok(()) +} + fn thread_contains_agent_text(thread: &Thread, needle: &str) -> bool { thread.turns.iter().any(|turn| { turn.items.iter().any( @@ -527,6 +588,7 @@ async fn thread_fork_non_interrupting_excludes_partial_output() -> Result<()> { /*git_info*/ None, )?; let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; append_sampling_boundary(&source_path).await?; append_partial_assistant_output(&source_path, "partial assistant output").await?; @@ -575,6 +637,7 @@ async fn thread_fork_non_interrupting_without_boundary_fails_closed() -> Result< /*git_info*/ None, )?; let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; append_partial_assistant_output(&source_path, "unmarked partial output").await?; let source_before = std::fs::read_to_string(&source_path)?; @@ -623,6 +686,7 @@ async fn thread_fork_ephemeral_non_interrupting_uses_selected_history() -> Resul /*git_info*/ None, )?; let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; append_sampling_boundary(&source_path).await?; append_partial_assistant_output(&source_path, "ephemeral partial output").await?; @@ -710,6 +774,61 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_fork_non_interrupting_replays_token_usage_from_selected_history() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "source-only partial output").await?; + append_token_count(&source_path).await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + let note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await??; + let parsed: ServerNotification = note.try_into()?; + let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { + panic!("expected thread/tokenUsage/updated notification"); + }; + + assert_eq!(thread.turns.len(), 2); + assert_eq!(notification.thread_id, thread.id); + assert_eq!(notification.turn_id, thread.turns[0].id); + assert_ne!(notification.turn_id, thread.turns[1].id); + assert_eq!(notification.token_usage.total.total_tokens, 150); + + Ok(()) +} + #[tokio::test] async fn thread_fork_can_exclude_turns_and_skip_restored_token_usage() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 0c2fe26ac0ab..0043807a66ac 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -60,6 +60,7 @@ use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; +use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::state_db::StateDbHandle; @@ -1912,18 +1913,35 @@ fn truncate_to_last_sampling_boundary( } let items = history.get_rollout_items(); + let Some(active_turn_start_index) = snapshot_state.active_turn_start_index else { + return Err(CodexErr::InvalidRequest( + "cannot fork active thread without interrupting: no stable sampling boundary is available" + .to_string(), + )); + }; let Some(last_boundary_position) = truncation::sampling_boundary_positions_in_rollout(items) - .last() - .copied() + .into_iter() + .rev() + .find(|position| *position >= active_turn_start_index) else { return Err(CodexErr::InvalidRequest( "cannot fork active thread without interrupting: no stable sampling boundary is available" .to_string(), )); }; - Ok(InitialHistory::Forked( - items[..=last_boundary_position].to_vec(), - )) + let mut selected = items[..=last_boundary_position].to_vec(); + if let Some(turn_id) = snapshot_state.active_turn_id.clone() { + selected.push(RolloutItem::EventMsg(EventMsg::TurnComplete( + TurnCompleteEvent { + turn_id, + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }, + ))); + } + Ok(InitialHistory::Forked(selected)) } /// Append the same persisted interrupt boundary used by the live interrupt path diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 82ddd9c158ec..0ec2dcbff468 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -25,6 +25,7 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadSource; +use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_utils_path_uri::PathUri; @@ -1966,8 +1967,21 @@ async fn non_interrupting_fork_snapshot_uses_stable_boundaries_and_fails_closed( assert_eq!( serde_json::to_value(&marked_fork_items).expect("serialize marked fork"), - serde_json::to_value(&marked_source_items[..3]).expect("serialize expected marked fork") + serde_json::to_value(vec![ + marked_source_items[0].clone(), + marked_source_items[1].clone(), + marked_source_items[2].clone(), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-marked".to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })), + ]) + .expect("serialize expected marked fork") ); + assert!(!snapshot_turn_state(&InitialHistory::Forked(marked_fork_items.clone())).ends_mid_turn); assert!(!marked_fork_items.iter().any(|item| { matches!( item, @@ -1978,6 +1992,64 @@ async fn non_interrupting_fork_snapshot_uses_stable_boundaries_and_fails_closed( ) })); + let active_without_boundary_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-completed".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("completed user")), + sampling_boundary("turn-completed", "window-completed"), + RolloutItem::ResponseItem(assistant_msg("completed assistant")), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-completed".to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })), + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-active".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("active user")), + RolloutItem::ResponseItem(assistant_msg("active partial")), + ]), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create active source without boundary"); + let active_without_boundary_path = active_without_boundary_source + .thread + .rollout_path() + .expect("active source rollout path should exist"); + let err = match manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + active_without_boundary_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + { + Ok(_) => panic!("active source without active-turn boundary should fail closed"), + Err(err) => err, + }; + assert!( + matches!(err, CodexErr::InvalidRequest(message) if message.contains("no stable sampling boundary")) + ); + let unmarked_source = manager .resume_thread_with_history( config.clone(), diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index 8053e616dfa2..36d149e9e082 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -76,14 +76,9 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec { - let has_delivery_metadata = matches!(item, ResponseItem::AgentMessage { .. }) - && idx.checked_sub(1).is_some_and(|previous_idx| { - matches!( - items.get(previous_idx), - Some(RolloutItem::InterAgentCommunicationMetadata { .. }) - ) - }); - if is_user_turn_boundary(item) && !has_delivery_metadata { + if is_user_turn_boundary(item) + && !response_item_has_delivery_metadata(items, idx, item) + { rollback_turn_positions.push(idx); } if is_real_user_message_boundary(item) || is_trigger_turn_boundary(item) { @@ -136,9 +131,11 @@ pub(crate) fn sampling_boundary_positions_in_rollout(items: &[RolloutItem]) -> V for (idx, item) in items.iter().enumerate() { match item { RolloutItem::ResponseItem(item) - if is_user_turn_boundary(item) => { - rollback_turn_positions.push(idx); - } + if is_user_turn_boundary(item) + && !response_item_has_delivery_metadata(items, idx, item) => + { + rollback_turn_positions.push(idx); + } RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } => { rollback_turn_positions.push(idx); @@ -169,6 +166,20 @@ pub(crate) fn sampling_boundary_positions_in_rollout(items: &[RolloutItem]) -> V boundary_positions } +fn response_item_has_delivery_metadata( + items: &[RolloutItem], + idx: usize, + item: &ResponseItem, +) -> bool { + matches!(item, ResponseItem::AgentMessage { .. }) + && idx.checked_sub(1).is_some_and(|previous_idx| { + matches!( + items.get(previous_idx), + Some(RolloutItem::InterAgentCommunicationMetadata { .. }) + ) + }) +} + /// Return a prefix of `items` obtained by cutting strictly before the nth user message. /// /// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index 9bc4e9e186cf..c24c0d4c3a9f 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -470,6 +470,31 @@ fn sampling_boundary_positions_apply_thread_rollback_markers() { assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![8]); } +#[test] +fn sampling_boundary_positions_do_not_double_count_inter_agent_metadata_delivery_pair() { + let triggered = InterAgentCommunication::new( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + Vec::new(), + "triggered task".to_string(), + /*trigger_turn*/ true, + ); + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + sampling_boundary("turn-1", "window-1"), + RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true }, + RolloutItem::ResponseItem(triggered.to_model_input_item()), + sampling_boundary("inter-agent-turn", "window-2"), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 2, + })), + RolloutItem::ResponseItem(user_msg("u3")), + sampling_boundary("turn-3", "window-3"), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![7]); +} + #[test] fn sampling_boundary_positions_ignore_zero_turn_rollback_markers() { let rollout = vec![ From 64bd05b7a64d93af1745a9a573c9ffb3fda6b6c9 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 13:47:35 -0700 Subject: [PATCH 04/14] Preserve null active fork mode serialization --- codex-rs/app-server-protocol/src/protocol/v2/tests.rs | 5 +++++ codex-rs/app-server-protocol/src/protocol/v2/thread.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index 48f92bf785bc..462cd37fd773 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -871,6 +871,11 @@ fn thread_fork_last_turn_id_round_trips() { serde_json::Value::Null, "optional lastTurnId should serialize as null when omitted" ); + assert_eq!( + omitted["activeForkMode"], + serde_json::Value::Null, + "optional activeForkMode should serialize as null when omitted" + ); } #[test] diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index d6a79c3259c5..d1fb3cec6f8e 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -567,7 +567,7 @@ pub struct ThreadForkParams { /// `nonInterrupting` to fork from an existing stable source-owned sampling /// boundary without cancelling or mutating the source thread. #[experimental("thread/fork.activeForkMode")] - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(default)] #[ts(optional = nullable)] pub active_fork_mode: Option, } From e3babb6f564f18981724bf7a5a13be01bc82dcf0 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 18:54:44 -0700 Subject: [PATCH 05/14] Keep legacy user-only forks completed --- codex-rs/core/src/thread_manager.rs | 19 +++++++++++-------- codex-rs/core/src/thread_manager_tests.rs | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 0043807a66ac..0c385139e6c0 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -1848,15 +1848,18 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState { }; // Synthetic fork/resume histories can contain user/assistant response items - // without explicit turn lifecycle events. If the persisted snapshot has no - // terminating boundary after its last user message, treat it as mid-turn. + // without explicit turn lifecycle events. If the persisted snapshot has a + // non-empty suffix after its last user message but no terminating boundary, + // treat it as mid-turn. + let suffix_after_last_user = &rollout_items[last_user_position + 1..]; SnapshotTurnState { - ends_mid_turn: !rollout_items[last_user_position + 1..].iter().any(|item| { - matches!( - item, - RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) - ) - }), + ends_mid_turn: !suffix_after_last_user.is_empty() + && !suffix_after_last_user.iter().any(|item| { + matches!( + item, + RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) + ) + }), active_turn_id: None, active_turn_start_index: None, } diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 0ec2dcbff468..589dce807b0b 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -267,6 +267,23 @@ fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() { ); } +#[test] +fn user_only_legacy_snapshot_is_not_mid_turn() { + let snapshot_state = + snapshot_turn_state(&InitialHistory::Forked(vec![RolloutItem::ResponseItem( + user_msg("saved user message"), + )])); + + assert_eq!( + snapshot_state, + SnapshotTurnState { + ends_mid_turn: false, + active_turn_id: None, + active_turn_start_index: None, + }, + ); +} + #[test] fn fork_thread_accepts_legacy_usize_snapshot_argument() { fn assert_legacy_snapshot_callsite( From 95801298fa7cffcb64c528d6377a2f7e4ec99def Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 19:08:14 -0700 Subject: [PATCH 06/14] Handle legacy user-event fork snapshots --- codex-rs/core/src/thread_manager.rs | 11 +++++--- codex-rs/core/src/thread_manager_tests.rs | 31 ++++++++++++++++++++++- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 0c385139e6c0..2f7f8ac20bdf 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -1848,12 +1848,15 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState { }; // Synthetic fork/resume histories can contain user/assistant response items - // without explicit turn lifecycle events. If the persisted snapshot has a - // non-empty suffix after its last user message but no terminating boundary, - // treat it as mid-turn. + // without explicit turn lifecycle events. Legacy rollouts may also persist + // the same user message as an EventMsg after the raw response item, which is + // still user-only history. Any other non-terminal suffix after the last raw + // user message belongs to an unfinished turn. let suffix_after_last_user = &rollout_items[last_user_position + 1..]; SnapshotTurnState { - ends_mid_turn: !suffix_after_last_user.is_empty() + ends_mid_turn: suffix_after_last_user + .iter() + .any(|item| !matches!(item, RolloutItem::EventMsg(EventMsg::UserMessage(_)))) && !suffix_after_last_user.iter().any(|item| { matches!( item, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 589dce807b0b..84a5a7e68c42 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -1535,7 +1535,7 @@ fn completed_legacy_event_history_is_not_mid_turn() { } #[test] -fn mixed_response_and_legacy_user_event_history_is_mid_turn() { +fn mixed_response_and_legacy_user_event_history_is_not_mid_turn() { let mixed_history = InitialHistory::Forked(vec![ RolloutItem::ResponseItem(user_msg("hello")), RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { @@ -1548,6 +1548,35 @@ fn mixed_response_and_legacy_user_event_history_is_mid_turn() { })), ]); + assert_eq!( + snapshot_turn_state(&mixed_history), + SnapshotTurnState { + ends_mid_turn: false, + active_turn_id: None, + active_turn_start_index: None, + }, + ); +} + +#[test] +fn mixed_response_and_legacy_user_event_with_agent_output_is_mid_turn() { + let mixed_history = InitialHistory::Forked(vec![ + RolloutItem::ResponseItem(user_msg("hello")), + RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: "hello".to_string(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + ..Default::default() + })), + RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent { + message: "partial".to_string(), + phase: None, + memory_citation: None, + })), + ]); + assert_eq!( snapshot_turn_state(&mixed_history), SnapshotTurnState { From f2869d932039ca77f403e1fb57094c6cd03a8219 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 20:07:05 -0700 Subject: [PATCH 07/14] Fix CI runner and WebRTC Bazel setup --- .github/workflows/bazel.yml | 18 ++------ .github/workflows/rust-ci.yml | 7 +--- .github/workflows/sdk.yml | 10 ++--- MODULE.bazel | 8 ++++ patches/BUILD.bazel | 1 + ...brtc-sys-build_bazel_out_dir_scratch.patch | 41 +++++++++++++++++++ 6 files changed, 59 insertions(+), 26 deletions(-) create mode 100644 patches/webrtc-sys-build_bazel_out_dir_scratch.patch diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml index 10dd8670de0a..dde6dd224266 100644 --- a/.github/workflows/bazel.yml +++ b/.github/workflows/bazel.yml @@ -148,9 +148,7 @@ jobs: - 2 - 3 - 4 - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 + runs-on: windows-latest name: Bazel test on windows-latest for x86_64-pc-windows-gnullvm shard ${{ matrix.shard }}/4 environment: name: bazel @@ -267,9 +265,7 @@ jobs: # it a larger timeout. if: github.event_name == 'push' && github.ref == 'refs/heads/main' timeout-minutes: 40 - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 + runs-on: windows-latest name: Bazel test on windows-latest for x86_64-pc-windows-gnullvm (native main) environment: name: bazel @@ -358,10 +354,7 @@ jobs: target: aarch64-apple-darwin - os: windows-latest target: x86_64-pc-windows-gnullvm - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 - runs-on: ${{ matrix.runs_on || matrix.os }} + runs-on: ${{ matrix.os }} name: Bazel clippy on ${{ matrix.os }} for ${{ matrix.target }} environment: name: bazel @@ -458,10 +451,7 @@ jobs: target: aarch64-apple-darwin - os: windows-latest target: x86_64-pc-windows-gnullvm - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 - runs-on: ${{ matrix.runs_on || matrix.os }} + runs-on: ${{ matrix.os }} name: Verify release build on ${{ matrix.os }} for ${{ matrix.target }} environment: name: bazel diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index cf1afdfe2de1..1a72609b683a 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -172,7 +172,7 @@ jobs: argument_comment_lint_prebuilt: name: Argument comment lint - ${{ matrix.name }} - runs-on: ${{ matrix.runs_on || matrix.runner }} + runs-on: ${{ matrix.runner }} timeout-minutes: ${{ matrix.timeout_minutes }} needs: changed environment: @@ -189,11 +189,8 @@ jobs: runner: macos-15-xlarge timeout_minutes: 30 - name: Windows - runner: windows-x64 + runner: windows-latest timeout_minutes: 30 - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 steps: - name: Check whether argument comment lint should run id: argument_comment_lint_gate diff --git a/.github/workflows/sdk.yml b/.github/workflows/sdk.yml index 41f887471220..06acd3bee03c 100644 --- a/.github/workflows/sdk.yml +++ b/.github/workflows/sdk.yml @@ -5,9 +5,7 @@ on: jobs: python-sdk: - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-linux-x64 + runs-on: ubuntu-24.04 timeout-minutes: 10 steps: - name: Checkout repository @@ -44,9 +42,7 @@ jobs: uses: ./.github/actions/check-clean-worktree sdks: - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-linux-x64 + runs-on: ubuntu-24.04 timeout-minutes: 10 environment: name: bazel @@ -156,7 +152,7 @@ jobs: run: pnpm -r --filter ./sdk/typescript run test - name: Save bazel repository cache - if: always() && !cancelled() && steps.setup_bazel.outputs.cache-hit != 'true' + if: always() && !cancelled() continue-on-error: true uses: actions/cache/save@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4 with: diff --git a/MODULE.bazel b/MODULE.bazel index 20d80f396260..b553d8179661 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -403,6 +403,7 @@ crate.annotation( ], build_script_env = { "WEBRTC_SYS_DARWIN_SDK_PATH": "$(location @macos_sdk//sysroot)", + "WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH": "1", "WEBRTC_SYS_LINK_OUT_DIR": "1", }, crate = "webrtc-sys", @@ -412,6 +413,13 @@ crate.annotation( "//patches:webrtc-sys_hermetic_darwin_sysroot.patch", ], ) +crate.annotation( + crate = "webrtc-sys-build", + patch_args = ["-p1"], + patches = [ + "//patches:webrtc-sys-build_bazel_out_dir_scratch.patch", + ], +) # Fix readme inclusions crate.annotation( diff --git a/patches/BUILD.bazel b/patches/BUILD.bazel index ed1413717007..b6fe38618d75 100644 --- a/patches/BUILD.bazel +++ b/patches/BUILD.bazel @@ -23,6 +23,7 @@ exports_files([ "v8_bazel_rules.patch", "v8_module_deps.patch", "v8_source_portability.patch", + "webrtc-sys-build_bazel_out_dir_scratch.patch", "webrtc-sys_hermetic_darwin_sysroot.patch", "windows-link.patch", "xz_windows_stack_args.patch", diff --git a/patches/webrtc-sys-build_bazel_out_dir_scratch.patch b/patches/webrtc-sys-build_bazel_out_dir_scratch.patch new file mode 100644 index 000000000000..e83b5a47bf0a --- /dev/null +++ b/patches/webrtc-sys-build_bazel_out_dir_scratch.patch @@ -0,0 +1,41 @@ +diff --git a/src/lib.rs b/src/lib.rs +index 2acdd60..4b083e1 100644 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -87,7 +87,7 @@ pub fn use_debug() -> bool { + /// across multiple crates without dependencies constraints + /// This also has the benefit of not re-downloading the binaries for each crate + pub fn prebuilt_dir() -> path::PathBuf { +- let target_dir = scratch::path(SCRATH_PATH); ++ let target_dir = scratch_dir(); + path::Path::new(&target_dir).join(format!( + "livekit/{}-{}/{}", + webrtc_triple(), +@@ -98,6 +98,18 @@ pub fn prebuilt_dir() -> path::PathBuf { + )) + } + ++fn scratch_dir() -> path::PathBuf { ++ if env::var_os("WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH").is_some() { ++ return path::PathBuf::from( ++ env::var_os("OUT_DIR") ++ .expect("OUT_DIR must be set when WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH is set"), ++ ) ++ .join(SCRATH_PATH); ++ } ++ ++ scratch::path(SCRATH_PATH) ++} ++ + pub fn download_url() -> String { + format!( + "https://github.com/livekit/rust-sdks/releases/download/{}/{}.zip", +@@ -197,7 +208,7 @@ pub fn configure_jni_symbols() -> Result<()> { + } + + pub fn download_webrtc() -> Result<()> { +- let dir = scratch::path(SCRATH_PATH); ++ let dir = scratch_dir(); + // temporary fix to avoid github workflow issue + fs::create_dir_all(&dir).context("Failed to create scratch_path")?; + let flock = File::create(dir.join(".lock")) From 802410cfc349accaf632ee73788713d67af20449 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 20:09:35 -0700 Subject: [PATCH 08/14] Trigger CI From 24ee404d07e5bc4a7dfdec24a2364866612bf9a9 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 20:26:26 -0700 Subject: [PATCH 09/14] Keep cxx bridge outputs in Bazel out dir --- MODULE.bazel | 8 ++++++++ patches/BUILD.bazel | 1 + patches/cxx-build_bazel_out_dir_shared.patch | 19 +++++++++++++++++++ 3 files changed, 28 insertions(+) create mode 100644 patches/cxx-build_bazel_out_dir_shared.patch diff --git a/MODULE.bazel b/MODULE.bazel index b553d8179661..17ea840caba7 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -396,12 +396,20 @@ crate.annotation( inject_repo(crate, "llvm", "llvm-project", "macos_sdk") +crate.annotation( + crate = "cxx-build", + patch_args = ["-p1"], + patches = [ + "//patches:cxx-build_bazel_out_dir_shared.patch", + ], +) crate.annotation( # Provide the hermetic SDK path so the build script doesn't try to invoke an unavailable `xcrun --show-sdk-path`. build_script_data = [ "@macos_sdk//sysroot", ], build_script_env = { + "CXX_BUILD_BAZEL_OUT_DIR_SHARED": "1", "WEBRTC_SYS_DARWIN_SDK_PATH": "$(location @macos_sdk//sysroot)", "WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH": "1", "WEBRTC_SYS_LINK_OUT_DIR": "1", diff --git a/patches/BUILD.bazel b/patches/BUILD.bazel index b6fe38618d75..d388e278fed5 100644 --- a/patches/BUILD.bazel +++ b/patches/BUILD.bazel @@ -4,6 +4,7 @@ exports_files([ "aws-lc-sys_windows_msvc_prebuilt_nasm.patch", "aws-lc-sys_windows_msvc_memcmp_probe.patch", "bzip2_windows_stack_args.patch", + "cxx-build_bazel_out_dir_shared.patch", "llvm_rusty_v8_custom_libcxx.patch", "llvm_windows_arm64_powl.patch", "llvm_windows_mingw_compat.patch", diff --git a/patches/cxx-build_bazel_out_dir_shared.patch b/patches/cxx-build_bazel_out_dir_shared.patch new file mode 100644 index 000000000000..acbe5c657a03 --- /dev/null +++ b/patches/cxx-build_bazel_out_dir_shared.patch @@ -0,0 +1,19 @@ +diff --git a/src/lib.rs b/src/lib.rs +index 3928062..7dfb803 100644 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -173,7 +173,13 @@ impl Project { + + let shared_dir = match target::find_target_dir(&out_dir) { + TargetDir::Path(target_dir) => target_dir.join("cxxbridge"), +- TargetDir::Unknown => scratch::path("cxxbridge"), ++ TargetDir::Unknown => { ++ if env::var_os("CXX_BUILD_BAZEL_OUT_DIR_SHARED").is_some() { ++ out_dir.join("cxxbridge-shared") ++ } else { ++ scratch::path("cxxbridge") ++ } ++ } + }; + + Ok(Project { From 6f59b6fa13f0fbd1c3407338a5a9deb0c222fea2 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 20:38:14 -0700 Subject: [PATCH 10/14] Give hosted SDK CI more time --- .github/workflows/sdk.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/sdk.yml b/.github/workflows/sdk.yml index 06acd3bee03c..6748fa872444 100644 --- a/.github/workflows/sdk.yml +++ b/.github/workflows/sdk.yml @@ -43,7 +43,7 @@ jobs: sdks: runs-on: ubuntu-24.04 - timeout-minutes: 10 + timeout-minutes: 30 environment: name: bazel deployment: false From 6328a737903e62b58201865eca6eba0e431e315d Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 21:11:01 -0700 Subject: [PATCH 11/14] Allow hosted SDK CI cold builds --- .github/workflows/sdk.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/sdk.yml b/.github/workflows/sdk.yml index 6748fa872444..1f38f46fe1b1 100644 --- a/.github/workflows/sdk.yml +++ b/.github/workflows/sdk.yml @@ -43,7 +43,7 @@ jobs: sdks: runs-on: ubuntu-24.04 - timeout-minutes: 30 + timeout-minutes: 60 environment: name: bazel deployment: false From fd72fda2dcb87d9199f609ec36d7c7fcdca04fe5 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 21:44:31 -0700 Subject: [PATCH 12/14] Allow hosted Bazel lint CI cold builds --- .github/workflows/bazel.yml | 8 ++++---- .github/workflows/rust-ci.yml | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml index dde6dd224266..e7b23d5a44f9 100644 --- a/.github/workflows/bazel.yml +++ b/.github/workflows/bazel.yml @@ -20,7 +20,7 @@ jobs: # signal without putting PR latency back on the critical path. When # Code-mode unit tests run on every Bazel target. When authenticated RBE # is available, the Windows-cross shards exercise the source-built V8 path. - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -139,7 +139,7 @@ jobs: # Split the Windows Bazel test leg across separate Windows hosts. Jobs with # BuildBuddy credentials use Linux RBE for build actions; test execution # remains on a Windows runner. - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -339,7 +339,7 @@ jobs: uses: ./.github/actions/check-clean-worktree clippy: - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -440,7 +440,7 @@ jobs: uses: ./.github/actions/check-clean-worktree verify-release-build: - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 1a72609b683a..016beda0e6c9 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -184,13 +184,13 @@ jobs: include: - name: Linux runner: ubuntu-24.04 - timeout_minutes: 30 + timeout_minutes: 60 - name: macOS runner: macos-15-xlarge - timeout_minutes: 30 + timeout_minutes: 60 - name: Windows runner: windows-latest - timeout_minutes: 30 + timeout_minutes: 60 steps: - name: Check whether argument comment lint should run id: argument_comment_lint_gate From 610ffe7916fd27663d007ca446659dc96349a36f Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 22:37:39 -0700 Subject: [PATCH 13/14] Limit unauthenticated Windows Bazel tests --- .github/workflows/bazel.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml index e7b23d5a44f9..df751787ebf1 100644 --- a/.github/workflows/bazel.yml +++ b/.github/workflows/bazel.yml @@ -186,6 +186,16 @@ jobs: set -euo pipefail bazel_test_query='tests(//...) except tests(//third_party/v8:all) except attr(tags, "manual", tests(//...))' + if [[ -z "${BUILDBUDDY_API_KEY:-}" ]]; then + # Without authenticated RBE, this job falls back from Windows + # cross-compile to native hosted Windows tests. Keep that fallback + # focused on stable target-level Windows coverage; the + # subprocess/ConPTY-heavy suites below are covered by Linux/macOS + # PR jobs and the native Windows main-branch lane. + bazel_test_query="${bazel_test_query} except //codex-rs/core:core-all-test" + bazel_test_query="${bazel_test_query} except //codex-rs/shell-command:shell-command-unit-tests" + bazel_test_query="${bazel_test_query} except //codex-rs/utils/pty:pty-unit-tests" + fi mapfile -t bazel_targets < <( ./.github/scripts/run-bazel-query-ci.sh --output=label -- "${bazel_test_query}" \ | LC_ALL=C sort From aa7f811a7f53efee25e9ba036b270b5930d2adc7 Mon Sep 17 00:00:00 2001 From: Tim Kersey Date: Sat, 27 Jun 2026 23:18:36 -0700 Subject: [PATCH 14/14] Relax Windows stdio init test timeout --- codex-rs/exec-server/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 82412a58e60a..1a0d76f899d8 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1535,7 +1535,7 @@ mod tests { cwd: None, }, client_name: "stdio-test-client".to_string(), - initialize_timeout: Duration::from_secs(1), + initialize_timeout: Duration::from_secs(5), resume_session_id: None, }) .await