diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml index 10dd8670de0a..df751787ebf1 100644 --- a/.github/workflows/bazel.yml +++ b/.github/workflows/bazel.yml @@ -20,7 +20,7 @@ jobs: # signal without putting PR latency back on the critical path. When # Code-mode unit tests run on every Bazel target. When authenticated RBE # is available, the Windows-cross shards exercise the source-built V8 path. - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -139,7 +139,7 @@ jobs: # Split the Windows Bazel test leg across separate Windows hosts. Jobs with # BuildBuddy credentials use Linux RBE for build actions; test execution # remains on a Windows runner. - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -148,9 +148,7 @@ jobs: - 2 - 3 - 4 - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 + runs-on: windows-latest name: Bazel test on windows-latest for x86_64-pc-windows-gnullvm shard ${{ matrix.shard }}/4 environment: name: bazel @@ -188,6 +186,16 @@ jobs: set -euo pipefail bazel_test_query='tests(//...) except tests(//third_party/v8:all) except attr(tags, "manual", tests(//...))' + if [[ -z "${BUILDBUDDY_API_KEY:-}" ]]; then + # Without authenticated RBE, this job falls back from Windows + # cross-compile to native hosted Windows tests. Keep that fallback + # focused on stable target-level Windows coverage; the + # subprocess/ConPTY-heavy suites below are covered by Linux/macOS + # PR jobs and the native Windows main-branch lane. + bazel_test_query="${bazel_test_query} except //codex-rs/core:core-all-test" + bazel_test_query="${bazel_test_query} except //codex-rs/shell-command:shell-command-unit-tests" + bazel_test_query="${bazel_test_query} except //codex-rs/utils/pty:pty-unit-tests" + fi mapfile -t bazel_targets < <( ./.github/scripts/run-bazel-query-ci.sh --output=label -- "${bazel_test_query}" \ | LC_ALL=C sort @@ -267,9 +275,7 @@ jobs: # it a larger timeout. if: github.event_name == 'push' && github.ref == 'refs/heads/main' timeout-minutes: 40 - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 + runs-on: windows-latest name: Bazel test on windows-latest for x86_64-pc-windows-gnullvm (native main) environment: name: bazel @@ -343,7 +349,7 @@ jobs: uses: ./.github/actions/check-clean-worktree clippy: - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -358,10 +364,7 @@ jobs: target: aarch64-apple-darwin - os: windows-latest target: x86_64-pc-windows-gnullvm - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 - runs-on: ${{ matrix.runs_on || matrix.os }} + runs-on: ${{ matrix.os }} name: Bazel clippy on ${{ matrix.os }} for ${{ matrix.target }} environment: name: bazel @@ -447,7 +450,7 @@ jobs: uses: ./.github/actions/check-clean-worktree verify-release-build: - timeout-minutes: 30 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -458,10 +461,7 @@ jobs: target: aarch64-apple-darwin - os: windows-latest target: x86_64-pc-windows-gnullvm - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 - runs-on: ${{ matrix.runs_on || matrix.os }} + runs-on: ${{ matrix.os }} name: Verify release build on ${{ matrix.os }} for ${{ matrix.target }} environment: name: bazel diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index cf1afdfe2de1..016beda0e6c9 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -172,7 +172,7 @@ jobs: argument_comment_lint_prebuilt: name: Argument comment lint - ${{ matrix.name }} - runs-on: ${{ matrix.runs_on || matrix.runner }} + runs-on: ${{ matrix.runner }} timeout-minutes: ${{ matrix.timeout_minutes }} needs: changed environment: @@ -184,16 +184,13 @@ jobs: include: - name: Linux runner: ubuntu-24.04 - timeout_minutes: 30 + timeout_minutes: 60 - name: macOS runner: macos-15-xlarge - timeout_minutes: 30 + timeout_minutes: 60 - name: Windows - runner: windows-x64 - timeout_minutes: 30 - runs_on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-windows-x64 + runner: windows-latest + timeout_minutes: 60 steps: - name: Check whether argument comment lint should run id: argument_comment_lint_gate diff --git a/.github/workflows/sdk.yml b/.github/workflows/sdk.yml index 41f887471220..1f38f46fe1b1 100644 --- a/.github/workflows/sdk.yml +++ b/.github/workflows/sdk.yml @@ -5,9 +5,7 @@ on: jobs: python-sdk: - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-linux-x64 + runs-on: ubuntu-24.04 timeout-minutes: 10 steps: - name: Checkout repository @@ -44,10 +42,8 @@ jobs: uses: ./.github/actions/check-clean-worktree sdks: - runs-on: - group: ${{ github.event.repository.name }}-runners - labels: ${{ github.event.repository.name }}-linux-x64 - timeout-minutes: 10 + runs-on: ubuntu-24.04 + timeout-minutes: 60 environment: name: bazel deployment: false @@ -156,7 +152,7 @@ jobs: run: pnpm -r --filter ./sdk/typescript run test - name: Save bazel repository cache - if: always() && !cancelled() && steps.setup_bazel.outputs.cache-hit != 'true' + if: always() && !cancelled() continue-on-error: true uses: actions/cache/save@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4 with: diff --git a/MODULE.bazel b/MODULE.bazel index 20d80f396260..17ea840caba7 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -396,13 +396,22 @@ crate.annotation( inject_repo(crate, "llvm", "llvm-project", "macos_sdk") +crate.annotation( + crate = "cxx-build", + patch_args = ["-p1"], + patches = [ + "//patches:cxx-build_bazel_out_dir_shared.patch", + ], +) crate.annotation( # Provide the hermetic SDK path so the build script doesn't try to invoke an unavailable `xcrun --show-sdk-path`. build_script_data = [ "@macos_sdk//sysroot", ], build_script_env = { + "CXX_BUILD_BAZEL_OUT_DIR_SHARED": "1", "WEBRTC_SYS_DARWIN_SDK_PATH": "$(location @macos_sdk//sysroot)", + "WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH": "1", "WEBRTC_SYS_LINK_OUT_DIR": "1", }, crate = "webrtc-sys", @@ -412,6 +421,13 @@ crate.annotation( "//patches:webrtc-sys_hermetic_darwin_sysroot.patch", ], ) +crate.annotation( + crate = "webrtc-sys-build", + patch_args = ["-p1"], + patches = [ + "//patches:webrtc-sys-build_bazel_out_dir_scratch.patch", + ], +) # Fix readme inclusions crate.annotation( diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index f3983e31f41c..4e2b14bd6a5b 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3585,6 +3585,13 @@ ], "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index b77b512b36e3..48f8c0c0b4e2 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17232,6 +17232,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "$schema": "http://json-schema.org/draft-07/schema#", "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index bdd61d6b9436..6909545a996c 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15011,6 +15011,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadForkParams": { "$schema": "http://json-schema.org/draft-07/schema#", "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index 76278b106ba3..b4560de465e5 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -71,6 +71,13 @@ ], "type": "string" }, + "ThreadForkActiveMode": { + "enum": [ + "interrupt", + "nonInterrupting" + ], + "type": "string" + }, "ThreadSource": { "type": "string" } diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts new file mode 100644 index 000000000000..02c07db67437 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkActiveMode.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadForkActiveMode = "interrupt" | "nonInterrupting"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index c405082c4c8f..d4e07a8e6540 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -400,6 +400,7 @@ export type { ThreadDeleteParams } from "./ThreadDeleteParams"; export type { ThreadDeleteResponse } from "./ThreadDeleteResponse"; export type { ThreadDeletedNotification } from "./ThreadDeletedNotification"; export type { ThreadExtra } from "./ThreadExtra"; +export type { ThreadForkActiveMode } from "./ThreadForkActiveMode"; export type { ThreadForkParams } from "./ThreadForkParams"; export type { ThreadForkResponse } from "./ThreadForkResponse"; export type { ThreadGoal } from "./ThreadGoal"; diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 2d9c6b6a1a48..38a2c6892075 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -389,7 +389,8 @@ impl ThreadHistoryBuilder { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) - | RolloutItem::SessionMeta(_) => {} + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => {} } } diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index 48f92bf785bc..462cd37fd773 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -871,6 +871,11 @@ fn thread_fork_last_turn_id_round_trips() { serde_json::Value::Null, "optional lastTurnId should serialize as null when omitted" ); + assert_eq!( + omitted["activeForkMode"], + serde_json::Value::Null, + "optional activeForkMode should serialize as null when omitted" + ); } #[test] diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 7cd03930d8a8..d1fb3cec6f8e 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -561,6 +561,24 @@ pub struct ThreadForkParams { #[experimental("thread/fork.excludeTurns")] #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub exclude_turns: bool, + /// How to fork a currently active source thread. + /// + /// Omitted preserves the legacy interrupted snapshot behavior. Use + /// `nonInterrupting` to fork from an existing stable source-owned sampling + /// boundary without cancelling or mutating the source thread. + #[experimental("thread/fork.activeForkMode")] + #[serde(default)] + #[ts(optional = nullable)] + pub active_fork_mode: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum ThreadForkActiveMode { + Interrupt, + NonInterrupting, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, ExperimentalApi)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index c9ac2f5fb27c..e9a2e3623f7a 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -139,7 +139,7 @@ Example with notification opt-out: - `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `allowProviderModelFallback` lets providers backed by an authoritative static model catalog replace an unavailable requested `model` with the catalog default; dynamic or cached catalogs preserve the requested model. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Deprecated experimental `multiAgentMode` is ignored; use Ultra reasoning effort for proactive multi-agent behavior. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots using environment-native absolute paths. Skills found below those roots are listed and read through the owning environment. Stdio MCP servers declared by selected plugins are started in that environment, and HTTP MCP connections use that environment's HTTP client. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the default active fork mode records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. Experimental clients can pass `activeForkMode: "nonInterrupting"` to fork from the latest existing stable sampling boundary without interrupting the source thread; if no boundary is available for a mid-turn source, the request fails. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. `instructionSources` lists loaded instruction files using each source environment's native absolute path syntax, including files loaded from remote environments. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. Their deprecated experimental `multiAgentMode` field, and the corresponding thread setting, always report `explicitRequestOnly`; Ultra reasoning effort is the source of proactive multi-agent behavior. - `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` for direct spawned children or `ancestorThreadId` for spawned descendants at any depth; the two filters are mutually exclusive. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known. - `thread/loaded/list` — list the thread ids currently loaded in memory. @@ -355,7 +355,7 @@ Example: } } ``` -To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. The returned `thread.sessionId` identifies the current live session tree root. Root threads use their own `thread.id` as `thread.sessionId`; stored threads that are not loaded also report their own `thread.id`, because resuming one makes it the root of a new live session tree. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only: +To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. The returned `thread.sessionId` identifies the current live session tree root. Root threads use their own `thread.id` as `thread.sessionId`; stored threads that are not loaded also report their own `thread.id`, because resuming one makes it the root of a new live session tree. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the default fork snapshots it as if the current turn had been interrupted first. Pass `activeForkMode: "nonInterrupting"` to fork from an existing stable boundary without interrupting the active source; a mid-turn source with no stable boundary returns an invalid-request error. Pass `ephemeral: true` when the fork should stay in-memory only: ```json { "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123", "ephemeral": true } } diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index b78c53a383a8..1f47c7f8fb01 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -3404,6 +3404,7 @@ impl ThreadRequestProcessor { ephemeral, thread_source, exclude_turns, + active_fork_mode, } = params; let include_turns = !exclude_turns; if sandbox.is_some() && permissions.is_some() { @@ -3485,15 +3486,19 @@ impl ThreadRequestProcessor { let fallback_model_provider = config.model_provider_id.clone(); - let NewThread { - thread_id, - thread: forked_thread, - session_configured, - .. - } = self + let fork_snapshot = match active_fork_mode + .unwrap_or(codex_app_server_protocol::ThreadForkActiveMode::Interrupt) + { + codex_app_server_protocol::ThreadForkActiveMode::Interrupt => ForkSnapshot::Interrupted, + codex_app_server_protocol::ThreadForkActiveMode::NonInterrupting => { + ForkSnapshot::NonInterrupting + } + }; + + let forked = self .thread_manager - .fork_thread_from_history( - ForkSnapshot::Interrupted, + .fork_thread_from_history_with_selection( + fork_snapshot, config, InitialHistory::Resumed(ResumedHistory { conversation_id: source_thread_id, @@ -3512,6 +3517,13 @@ impl ThreadRequestProcessor { CodexErr::InvalidRequest(message) => invalid_request(message), err => internal_error(format!("error forking thread: {err}")), })?; + let fork_history_items = forked.selected_history; + let NewThread { + thread_id, + thread: forked_thread, + session_configured, + .. + } = forked.new_thread; Self::set_app_server_client_info( forked_thread.as_ref(), @@ -3569,12 +3581,12 @@ impl ThreadRequestProcessor { &config_snapshot, /*path*/ None, ); - thread.preview = preview_from_rollout_items(&history_items); + thread.preview = preview_from_rollout_items(&fork_history_items); thread.forked_from_id = Some(source_thread_id.to_string()); if include_turns { populate_thread_turns_from_history( &mut thread, - &history_items, + &fork_history_items, /*active_turn*/ None, ); } @@ -3635,7 +3647,7 @@ impl ThreadRequestProcessor { // instead of rebuilding history only to attribute a historical update. if let Some(token_usage_thread) = token_usage_thread { let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &history_items, + &fork_history_items, token_usage_thread.turns.as_slice(), ); // Mirror the resume contract for forks: the new thread is usable as soon diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index d7c1707a3297..9f5221bfc3f0 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -12,6 +12,8 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::SessionSource; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadForkActiveMode; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; @@ -31,8 +33,17 @@ use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::ThreadId; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::MultiAgentVersion; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::SamplingBoundaryEvent; +use codex_protocol::protocol::TokenCountEvent; +use codex_protocol::protocol::TokenUsage; +use codex_protocol::protocol::TokenUsageInfo; +use codex_protocol::protocol::TurnStartedEvent; +use codex_protocol::protocol::UserMessageEvent; use codex_rollout::append_rollout_item_to_path; use codex_rollout::append_thread_name; use codex_rollout::read_session_meta_line; @@ -40,6 +51,7 @@ use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::path::Path; +use std::path::PathBuf; use tempfile::TempDir; use tokio::time::timeout; use wiremock::Mock; @@ -83,6 +95,107 @@ async fn list_threads(mcp: &mut TestAppServer) -> Result { to_response::(list_resp) } +fn rollout_path(codex_home: &Path, timestamp: &str, conversation_id: &str) -> PathBuf { + codex_home + .join("sessions") + .join("2025") + .join("01") + .join("05") + .join(format!("rollout-{timestamp}-{conversation_id}.jsonl")) +} + +fn assistant_message(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + phase: None, + internal_chat_message_metadata_passthrough: None, + } +} + +async fn append_sampling_boundary(path: &Path) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: "turn-1".to_string(), + window_id: "window-1".to_string(), + }), + ) + .await?; + Ok(()) +} + +async fn append_partial_assistant_output(path: &Path, text: &str) -> Result<()> { + append_rollout_item_to_path(path, &RolloutItem::ResponseItem(assistant_message(text))).await?; + Ok(()) +} + +async fn append_active_user_turn_start(path: &Path, turn_id: &str, message: &str) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_id.to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + ) + .await?; + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: message.to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + ..Default::default() + })), + ) + .await?; + Ok(()) +} + +async fn append_token_count(path: &Path) -> Result<()> { + append_rollout_item_to_path( + path, + &RolloutItem::EventMsg(EventMsg::TokenCount(TokenCountEvent { + info: Some(TokenUsageInfo { + total_token_usage: TokenUsage { + input_tokens: 200, + cached_input_tokens: 40, + output_tokens: 60, + reasoning_output_tokens: 20, + total_tokens: 260, + }, + last_token_usage: TokenUsage { + input_tokens: 80, + cached_input_tokens: 15, + output_tokens: 40, + reasoning_output_tokens: 10, + total_tokens: 120, + }, + model_context_window: Some(200_000), + }), + rate_limits: None, + })), + ) + .await?; + Ok(()) +} + +fn thread_contains_agent_text(thread: &Thread, needle: &str) -> bool { + thread.turns.iter().any(|turn| { + turn.items.iter().any( + |item| matches!(item, ThreadItem::AgentMessage { text, .. } if text.contains(needle)), + ) + }) +} + #[tokio::test] async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -459,6 +572,154 @@ async fn thread_fork_can_load_source_by_path() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_fork_non_interrupting_excludes_partial_output() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let preview = "Saved user message"; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + preview, + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "partial assistant output").await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let fork_result = fork_resp.result.clone(); + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert_eq!(fork_result.get("appliedSnapshotMode"), None); + assert_eq!(fork_result.get("snapshotFallbackReason"), None); + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, preview); + assert!( + !thread_contains_agent_text(&thread, "partial assistant output"), + "non-interrupting fork must exclude assistant output after the sampling boundary" + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_fork_non_interrupting_without_boundary_fails_closed() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; + append_partial_assistant_output(&source_path, "unmarked partial output").await?; + let source_before = std::fs::read_to_string(&source_path)?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(fork_id)), + ) + .await??; + + assert!( + fork_err + .error + .message + .contains("no stable sampling boundary"), + "unexpected fork error: {}", + fork_err.error.message + ); + assert_eq!(std::fs::read_to_string(&source_path)?, source_before); + + Ok(()) +} + +#[tokio::test] +async fn thread_fork_ephemeral_non_interrupting_uses_selected_history() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let preview = "Saved user message"; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + preview, + Some("mock_provider"), + /*git_info*/ None, + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "ephemeral partial output").await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + ephemeral: true, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert!(thread.ephemeral); + assert_eq!(thread.path, None); + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, preview); + assert!( + !thread_contains_agent_text(&thread, "ephemeral partial output"), + "ephemeral fork response must use selected history, not raw source history" + ); + + Ok(()) +} + #[tokio::test] async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -513,6 +774,61 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_fork_non_interrupting_replays_token_usage_from_selected_history() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + let source_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id); + append_active_user_turn_start(&source_path, "active-turn", "active user message").await?; + append_sampling_boundary(&source_path).await?; + append_partial_assistant_output(&source_path, "source-only partial output").await?; + append_token_count(&source_path).await?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + active_fork_mode: Some(ThreadForkActiveMode::NonInterrupting), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + let note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await??; + let parsed: ServerNotification = note.try_into()?; + let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { + panic!("expected thread/tokenUsage/updated notification"); + }; + + assert_eq!(thread.turns.len(), 2); + assert_eq!(notification.thread_id, thread.id); + assert_eq!(notification.turn_id, thread.turns[0].id); + assert_ne!(notification.turn_id, thread.turns[1].id); + assert_eq!(notification.token_usage.total.total_tokens, 150); + + Ok(()) +} + #[tokio::test] async fn thread_fork_can_exclude_turns_and_skip_restored_token_usage() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/agent/control/spawn.rs b/codex-rs/core/src/agent/control/spawn.rs index c4c40577ca4f..11dbb9bc1d11 100644 --- a/codex-rs/core/src/agent/control/spawn.rs +++ b/codex-rs/core/src/agent/control/spawn.rs @@ -63,7 +63,10 @@ fn keep_forked_rollout_item(item: &RolloutItem, preserve_reference_context_item: // from the parent's durable baseline. Truncated forks drop part of that prompt, // so they must rebuild context on their first child turn. RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) => preserve_reference_context_item, - RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => true, + RolloutItem::Compacted(_) + | RolloutItem::EventMsg(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => true, } } diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 9863bfb7e24c..413e6317a70a 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -172,6 +172,7 @@ async fn persisted_originator(thread: &CodexThread) -> String { | RolloutItem::EventMsg(_) | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::TurnContext(_) => None, }) .expect("session metadata should be persisted") diff --git a/codex-rs/core/src/session/rollout_reconstruction.rs b/codex-rs/core/src/session/rollout_reconstruction.rs index 0b89caaf0575..955adc38f83f 100644 --- a/codex-rs/core/src/session/rollout_reconstruction.rs +++ b/codex-rs/core/src/session/rollout_reconstruction.rs @@ -280,7 +280,8 @@ impl Session { } RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) - | RolloutItem::InterAgentCommunicationMetadata { .. } => {} + | RolloutItem::InterAgentCommunicationMetadata { .. } + | RolloutItem::SamplingBoundary(_) => {} } if base_replacement_history.is_some() @@ -337,7 +338,8 @@ impl Session { turn_context.model_info.truncation_policy.into(), ); } - RolloutItem::InterAgentCommunicationMetadata { .. } => {} + RolloutItem::InterAgentCommunicationMetadata { .. } + | RolloutItem::SamplingBoundary(_) => {} RolloutItem::Compacted(compacted) => { if let Some(replacement_history) = &compacted.replacement_history { // This should actually never happen, because the reverse loop above (to build rollout_suffix) @@ -415,6 +417,7 @@ impl Session { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => { unreachable!("only world-state replay items are collected") } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index ece9d446b57e..81ea464a3e4c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2758,6 +2758,7 @@ async fn start_new_context_window_assigns_and_persists_item_ids() { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }); assert_eq!( @@ -2817,6 +2818,7 @@ async fn record_initial_history_assigns_and_persists_id_for_forked_response_item | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }); assert_eq!(persisted_item_id, Some(live_item_id.as_str())); diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index f16b525002fc..9691e165388b 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -101,7 +101,9 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::PlanDeltaEvent; use codex_protocol::protocol::ReasoningContentDeltaEvent; use codex_protocol::protocol::ReasoningRawContentDeltaEvent; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SafetyBufferingEvent; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::TurnDiffEvent; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; @@ -275,6 +277,11 @@ pub(crate) async fn run_turn( } .instrument(trace_span!("run_turn.prepare_sampling_request_input")) .await; + sess.persist_rollout_items(&[RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_context.sub_id.clone(), + window_id: window_id.clone(), + })]) + .await; let responses_metadata = turn_context.turn_metadata_state.to_responses_metadata( sess.installation_id.clone(), diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 4ea86cb33d00..2f7f8ac20bdf 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -60,6 +60,7 @@ use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; +use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::state_db::StateDbHandle; @@ -125,15 +126,6 @@ pub struct NewThread { pub session_configured: SessionConfiguredEvent, } -// TODO(ccunningham): Add an explicit non-interrupting live-turn snapshot once -// core can represent sampling boundaries directly instead of relying on -// whichever items happened to be persisted mid-turn. -// -// Two likely future variants: -// - `TruncateToLastSamplingBoundary` for callers that want a coherent fork from -// the last stable model boundary without synthesizing an interrupt. -// - `WaitUntilNextSamplingBoundary` (or similar) for callers that prefer to -// fork after the next sampling boundary rather than interrupting immediately. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ForkSnapshot { /// Fork a committed prefix ending strictly before the nth user message. @@ -154,6 +146,18 @@ pub enum ForkSnapshot { /// already at a turn boundary, this returns the current persisted history /// unchanged. Interrupted, + + /// Fork without interrupting the source thread. + /// + /// Mid-turn sources use the latest persisted sampling boundary. If no + /// boundary exists, this fails closed instead of falling back to an + /// interrupted snapshot. + NonInterrupting, +} + +pub struct ForkedThread { + pub new_thread: NewThread, + pub selected_history: Vec, } /// Preserve legacy `fork_thread(usize, ...)` callsites by mapping them to the @@ -738,7 +742,7 @@ impl ThreadManager { &options.config, inherited_multi_agent_version, ), - ); + )?; self.start_thread_with_options_and_fork_source(options, Some(forked_from_thread_id)) .await } @@ -988,6 +992,31 @@ impl ThreadManager { parent_trace: Option, supports_openai_form_elicitation: bool, ) -> CodexResult + where + S: Into, + { + Ok(self + .fork_thread_from_history_with_selection( + snapshot, + config, + history, + thread_source, + parent_trace, + supports_openai_form_elicitation, + ) + .await? + .new_thread) + } + + pub async fn fork_thread_from_history_with_selection( + &self, + snapshot: S, + config: Config, + history: InitialHistory, + thread_source: Option, + parent_trace: Option, + supports_openai_form_elicitation: bool, + ) -> CodexResult where S: Into, { @@ -1010,7 +1039,7 @@ impl ThreadManager { thread_source: Option, parent_trace: Option, supports_openai_form_elicitation: bool, - ) -> CodexResult { + ) -> CodexResult { // `forked_from_id()` describes this history's existing lineage. When // forking a resumed thread, the child copies the resumed thread itself. let source_thread_id = match &history { @@ -1030,13 +1059,14 @@ impl ThreadManager { .await; let interrupted_marker = InterruptedTurnHistoryMarker::from_config_and_version(&config, multi_agent_version); - let history = fork_history_from_snapshot(snapshot, history, interrupted_marker); + let history = fork_history_from_snapshot(snapshot, history, interrupted_marker)?; + let selected_history = history.get_rollout_items().to_vec(); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, ); let agent_control = self.agent_control_for_config(&config); - Box::pin(self.state.spawn_thread( + let new_thread = Box::pin(self.state.spawn_thread( config, history, Arc::clone(&self.state.auth_manager), @@ -1052,7 +1082,11 @@ impl ThreadManager { supports_openai_form_elicitation, /*user_shell_override*/ None, )) - .await + .await?; + Ok(ForkedThread { + new_thread, + selected_history, + }) } pub(crate) fn agent_control(&self) -> AgentControl { @@ -1814,15 +1848,21 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState { }; // Synthetic fork/resume histories can contain user/assistant response items - // without explicit turn lifecycle events. If the persisted snapshot has no - // terminating boundary after its last user message, treat it as mid-turn. + // without explicit turn lifecycle events. Legacy rollouts may also persist + // the same user message as an EventMsg after the raw response item, which is + // still user-only history. Any other non-terminal suffix after the last raw + // user message belongs to an unfinished turn. + let suffix_after_last_user = &rollout_items[last_user_position + 1..]; SnapshotTurnState { - ends_mid_turn: !rollout_items[last_user_position + 1..].iter().any(|item| { - matches!( - item, - RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) - ) - }), + ends_mid_turn: suffix_after_last_user + .iter() + .any(|item| !matches!(item, RolloutItem::EventMsg(EventMsg::UserMessage(_)))) + && !suffix_after_last_user.iter().any(|item| { + matches!( + item, + RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) + ) + }), active_turn_id: None, active_turn_start_index: None, } @@ -1832,12 +1872,12 @@ fn fork_history_from_snapshot( snapshot: ForkSnapshot, history: InitialHistory, interrupted_marker: InterruptedTurnHistoryMarker, -) -> InitialHistory { +) -> CodexResult { let snapshot_state = snapshot_turn_state(&history); match snapshot { - ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => { - truncate_before_nth_user_message(history, nth_user_message, &snapshot_state) - } + ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => Ok( + truncate_before_nth_user_message(history, nth_user_message, &snapshot_state), + ), ForkSnapshot::Interrupted => { let history = match history { InitialHistory::New => InitialHistory::New, @@ -1848,16 +1888,66 @@ fn fork_history_from_snapshot( } }; if snapshot_state.ends_mid_turn { - append_interrupted_boundary( + Ok(append_interrupted_boundary( history, snapshot_state.active_turn_id, interrupted_marker, - ) + )) } else { - history + Ok(history) } } + ForkSnapshot::NonInterrupting => { + truncate_to_last_sampling_boundary(history, &snapshot_state) + } + } +} + +fn truncate_to_last_sampling_boundary( + history: InitialHistory, + snapshot_state: &SnapshotTurnState, +) -> CodexResult { + if !snapshot_state.ends_mid_turn { + return Ok(match history { + InitialHistory::New => InitialHistory::New, + InitialHistory::Cleared => InitialHistory::Cleared, + InitialHistory::Forked(history) => InitialHistory::Forked(history), + InitialHistory::Resumed(resumed) => { + InitialHistory::Forked(Arc::unwrap_or_clone(resumed.history)) + } + }); + } + + let items = history.get_rollout_items(); + let Some(active_turn_start_index) = snapshot_state.active_turn_start_index else { + return Err(CodexErr::InvalidRequest( + "cannot fork active thread without interrupting: no stable sampling boundary is available" + .to_string(), + )); + }; + let Some(last_boundary_position) = truncation::sampling_boundary_positions_in_rollout(items) + .into_iter() + .rev() + .find(|position| *position >= active_turn_start_index) + else { + return Err(CodexErr::InvalidRequest( + "cannot fork active thread without interrupting: no stable sampling boundary is available" + .to_string(), + )); + }; + let mut selected = items[..=last_boundary_position].to_vec(); + if let Some(turn_id) = snapshot_state.active_turn_id.clone() { + selected.push(RolloutItem::EventMsg(EventMsg::TurnComplete( + TurnCompleteEvent { + turn_id, + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }, + ))); } + Ok(InitialHistory::Forked(selected)) } /// Append the same persisted interrupt boundary used by the live interrupt path diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 4f0da0c9c7cf..84a5a7e68c42 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -20,10 +20,12 @@ use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; use codex_protocol::protocol::ResumedHistory; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadSource; +use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_utils_path_uri::PathUri; @@ -113,6 +115,13 @@ fn developer_interrupted_marker() -> ResponseItem { .expect("developer interrupted marker should be enabled") } +fn sampling_boundary(turn_id: &str, window_id: &str) -> RolloutItem { + RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_id.to_string(), + window_id: window_id.to_string(), + }) +} + #[test] fn effective_originator_prefers_thread_scoped_sources_before_env_originator() { for (metrics_service_name, persisted_originator, inherited_originator, expected_originator) in [ @@ -258,6 +267,23 @@ fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() { ); } +#[test] +fn user_only_legacy_snapshot_is_not_mid_turn() { + let snapshot_state = + snapshot_turn_state(&InitialHistory::Forked(vec![RolloutItem::ResponseItem( + user_msg("saved user message"), + )])); + + assert_eq!( + snapshot_state, + SnapshotTurnState { + ends_mid_turn: false, + active_turn_id: None, + active_turn_start_index: None, + }, + ); +} + #[test] fn fork_thread_accepts_legacy_usize_snapshot_argument() { fn assert_legacy_snapshot_callsite( @@ -1509,7 +1535,31 @@ fn completed_legacy_event_history_is_not_mid_turn() { } #[test] -fn mixed_response_and_legacy_user_event_history_is_mid_turn() { +fn mixed_response_and_legacy_user_event_history_is_not_mid_turn() { + let mixed_history = InitialHistory::Forked(vec![ + RolloutItem::ResponseItem(user_msg("hello")), + RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: "hello".to_string(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + ..Default::default() + })), + ]); + + assert_eq!( + snapshot_turn_state(&mixed_history), + SnapshotTurnState { + ends_mid_turn: false, + active_turn_id: None, + active_turn_start_index: None, + }, + ); +} + +#[test] +fn mixed_response_and_legacy_user_event_with_agent_output_is_mid_turn() { let mixed_history = InitialHistory::Forked(vec![ RolloutItem::ResponseItem(user_msg("hello")), RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { @@ -1520,6 +1570,11 @@ fn mixed_response_and_legacy_user_event_history_is_mid_turn() { local_images: Vec::new(), ..Default::default() })), + RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent { + message: "partial".to_string(), + phase: None, + memory_citation: None, + })), ]); assert_eq!( @@ -1878,3 +1933,263 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ 1, ); } + +#[tokio::test] +async fn non_interrupting_fork_snapshot_uses_stable_boundaries_and_fails_closed() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let state_db = init_state_db(&config).await; + let manager = ThreadManager::new( + &config, + auth_manager.clone(), + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + empty_extension_registry(), + Arc::new(crate::test_support::EmptyUserInstructionsProvider), + /*analytics_events_client*/ None, + thread_store_from_config(&config, state_db.clone()), + local_agent_graph_store_from_state_db(state_db.as_ref()), + TEST_INSTALLATION_ID.to_string(), + /*attestation_provider*/ None, + /*external_time_provider*/ None, + ); + + let marked_source_items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-marked".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("marked user")), + sampling_boundary("turn-marked", "window-marked"), + RolloutItem::ResponseItem(assistant_msg("partial after boundary")), + ]; + let marked_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(marked_source_items.clone()), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create marked source thread"); + let marked_source_path = marked_source + .thread + .rollout_path() + .expect("marked source rollout path should exist"); + + let marked_fork = manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + marked_source_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + .expect("fork marked source without interrupting"); + let marked_fork_path = marked_fork + .thread + .rollout_path() + .expect("marked fork rollout path should exist"); + let marked_fork_history = RolloutRecorder::get_rollout_history(&marked_fork_path) + .await + .expect("read marked fork rollout"); + let marked_fork_items: Vec<_> = marked_fork_history + .get_rollout_items() + .iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .cloned() + .collect(); + + assert_eq!( + serde_json::to_value(&marked_fork_items).expect("serialize marked fork"), + serde_json::to_value(vec![ + marked_source_items[0].clone(), + marked_source_items[1].clone(), + marked_source_items[2].clone(), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-marked".to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })), + ]) + .expect("serialize expected marked fork") + ); + assert!(!snapshot_turn_state(&InitialHistory::Forked(marked_fork_items.clone())).ends_mid_turn); + assert!(!marked_fork_items.iter().any(|item| { + matches!( + item, + RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { + reason: TurnAbortReason::Interrupted, + .. + })) + ) + })); + + let active_without_boundary_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-completed".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("completed user")), + sampling_boundary("turn-completed", "window-completed"), + RolloutItem::ResponseItem(assistant_msg("completed assistant")), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-completed".to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })), + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-active".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::ResponseItem(user_msg("active user")), + RolloutItem::ResponseItem(assistant_msg("active partial")), + ]), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create active source without boundary"); + let active_without_boundary_path = active_without_boundary_source + .thread + .rollout_path() + .expect("active source rollout path should exist"); + let err = match manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + active_without_boundary_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + { + Ok(_) => panic!("active source without active-turn boundary should fail closed"), + Err(err) => err, + }; + assert!( + matches!(err, CodexErr::InvalidRequest(message) if message.contains("no stable sampling boundary")) + ); + + let unmarked_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(vec![ + RolloutItem::ResponseItem(user_msg("unmarked user")), + RolloutItem::ResponseItem(assistant_msg("unmarked partial")), + ]), + auth_manager.clone(), + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create unmarked source thread"); + let unmarked_source_path = unmarked_source + .thread + .rollout_path() + .expect("unmarked source rollout path should exist"); + let unmarked_before = + std::fs::read_to_string(&unmarked_source_path).expect("read unmarked source before fork"); + let err = match manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config.clone(), + unmarked_source_path.clone(), + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + { + Ok(_) => panic!("unmarked active source should fail closed"), + Err(err) => err, + }; + assert!( + matches!(err, CodexErr::InvalidRequest(message) if message.contains("no stable sampling boundary")) + ); + let unmarked_after = + std::fs::read_to_string(&unmarked_source_path).expect("read unmarked source after fork"); + assert_eq!(unmarked_after, unmarked_before); + + let inactive_source_items = vec![ + RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + client_id: None, + message: "inactive user".to_string(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + ..Default::default() + })), + RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent { + message: "inactive done".to_string(), + phase: None, + memory_citation: None, + })), + ]; + let inactive_source = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Forked(inactive_source_items.clone()), + auth_manager, + /*parent_trace*/ None, + /*supports_openai_form_elicitation*/ false, + ) + .await + .expect("create inactive source thread"); + let inactive_source_path = inactive_source + .thread + .rollout_path() + .expect("inactive source rollout path should exist"); + let inactive_fork = manager + .fork_thread( + ForkSnapshot::NonInterrupting, + config, + inactive_source_path, + /*thread_source*/ None, + /*parent_trace*/ None, + ) + .await + .expect("fork inactive source without boundary"); + let inactive_fork_path = inactive_fork + .thread + .rollout_path() + .expect("inactive fork rollout path should exist"); + let inactive_fork_history = RolloutRecorder::get_rollout_history(&inactive_fork_path) + .await + .expect("read inactive fork rollout"); + let inactive_fork_items: Vec<_> = inactive_fork_history + .get_rollout_items() + .iter() + .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) + .cloned() + .collect(); + + assert_eq!( + serde_json::to_value(&inactive_fork_items).expect("serialize inactive fork"), + serde_json::to_value(&inactive_source_items).expect("serialize expected inactive fork") + ); +} diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index d7d6ad998cee..36d149e9e082 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -76,14 +76,9 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec { - let has_delivery_metadata = matches!(item, ResponseItem::AgentMessage { .. }) - && idx.checked_sub(1).is_some_and(|previous_idx| { - matches!( - items.get(previous_idx), - Some(RolloutItem::InterAgentCommunicationMetadata { .. }) - ) - }); - if is_user_turn_boundary(item) && !has_delivery_metadata { + if is_user_turn_boundary(item) + && !response_item_has_delivery_metadata(items, idx, item) + { rollback_turn_positions.push(idx); } if is_real_user_message_boundary(item) || is_trigger_turn_boundary(item) { @@ -125,6 +120,66 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec Vec { + let mut rollback_turn_positions = Vec::new(); + let mut boundary_positions = Vec::new(); + for (idx, item) in items.iter().enumerate() { + match item { + RolloutItem::ResponseItem(item) + if is_user_turn_boundary(item) + && !response_item_has_delivery_metadata(items, idx, item) => + { + rollback_turn_positions.push(idx); + } + RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } => { + rollback_turn_positions.push(idx); + } + RolloutItem::SamplingBoundary(_) => { + boundary_positions.push(idx); + } + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { + let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX); + if num_turns == 0 { + continue; + } + let Some(rollback_start_idx) = rollback_turn_positions + .len() + .checked_sub(num_turns) + .map(|rollback_start| rollback_turn_positions[rollback_start]) + .or_else(|| rollback_turn_positions.first().copied()) + else { + continue; + }; + let new_rollback_len = rollback_turn_positions.len().saturating_sub(num_turns); + rollback_turn_positions.truncate(new_rollback_len); + boundary_positions.retain(|position| *position < rollback_start_idx); + } + _ => {} + } + } + boundary_positions +} + +fn response_item_has_delivery_metadata( + items: &[RolloutItem], + idx: usize, + item: &ResponseItem, +) -> bool { + matches!(item, ResponseItem::AgentMessage { .. }) + && idx.checked_sub(1).is_some_and(|previous_idx| { + matches!( + items.get(previous_idx), + Some(RolloutItem::InterAgentCommunicationMetadata { .. }) + ) + }) +} + /// Return a prefix of `items` obtained by cutting strictly before the nth user message. /// /// The boundary index is 0-based from the start of `items` (so `n_from_start = 0` returns diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index 16d10a827728..c24c0d4c3a9f 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -5,6 +5,7 @@ use codex_protocol::AgentPath; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::SamplingBoundaryEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; @@ -89,6 +90,13 @@ fn turn_completed(turn_id: &str) -> RolloutItem { })) } +fn sampling_boundary(turn_id: &str, window_id: &str) -> RolloutItem { + RolloutItem::SamplingBoundary(SamplingBoundaryEvent { + turn_id: turn_id.to_string(), + window_id: window_id.to_string(), + }) +} + #[test] fn truncates_rollout_after_terminal_canonical_turn_id() { let rollout = vec![ @@ -443,6 +451,64 @@ fn fork_turn_positions_ignore_zero_turn_rollback_markers() { assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 1, 3]); } +#[test] +fn sampling_boundary_positions_apply_thread_rollback_markers() { + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + RolloutItem::ResponseItem(assistant_msg("a1")), + turn_started("turn-2"), + RolloutItem::ResponseItem(user_msg("u2")), + sampling_boundary("turn-2", "window-2"), + RolloutItem::ResponseItem(assistant_msg("partial")), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 1, + })), + RolloutItem::ResponseItem(user_msg("u3")), + sampling_boundary("turn-3", "window-3"), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![8]); +} + +#[test] +fn sampling_boundary_positions_do_not_double_count_inter_agent_metadata_delivery_pair() { + let triggered = InterAgentCommunication::new( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + Vec::new(), + "triggered task".to_string(), + /*trigger_turn*/ true, + ); + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + sampling_boundary("turn-1", "window-1"), + RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true }, + RolloutItem::ResponseItem(triggered.to_model_input_item()), + sampling_boundary("inter-agent-turn", "window-2"), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 2, + })), + RolloutItem::ResponseItem(user_msg("u3")), + sampling_boundary("turn-3", "window-3"), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![7]); +} + +#[test] +fn sampling_boundary_positions_ignore_zero_turn_rollback_markers() { + let rollout = vec![ + RolloutItem::ResponseItem(user_msg("u1")), + sampling_boundary("turn-1", "window-1"), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 0, + })), + RolloutItem::ResponseItem(assistant_msg("a1")), + ]; + + assert_eq!(sampling_boundary_positions_in_rollout(&rollout), vec![1]); +} + #[test] fn truncates_rollout_to_last_n_fork_turns_discards_trigger_boundaries_in_rolled_back_suffix() { let rollout = vec![ diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 82412a58e60a..1a0d76f899d8 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1535,7 +1535,7 @@ mod tests { cwd: None, }, client_name: "stdio-test-client".to_string(), - initialize_timeout: Duration::from_secs(1), + initialize_timeout: Duration::from_secs(5), resume_session_id: None, }) .await diff --git a/codex-rs/memories/write/src/phase1.rs b/codex-rs/memories/write/src/phase1.rs index 712e11d0ef87..087cf8c416f1 100644 --- a/codex-rs/memories/write/src/phase1.rs +++ b/codex-rs/memories/write/src/phase1.rs @@ -416,6 +416,7 @@ mod job { | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) .collect::>(); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 96a793bcd38f..ddf409c2b084 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2140,6 +2140,12 @@ pub struct TokenCountEvent { pub rate_limits: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema, TS)] +pub struct SamplingBoundaryEvent { + pub turn_id: String, + pub window_id: String, +} + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema, TS)] pub struct RateLimitSnapshot { pub limit_id: Option, @@ -2666,6 +2672,7 @@ impl InitialHistory { | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) .and_then(|turn_context| turn_context.multi_agent_mode) @@ -3002,6 +3009,7 @@ fn multi_agent_version_from_items( | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) }) @@ -3164,6 +3172,7 @@ pub enum RolloutItem { Compacted(CompactedItem), TurnContext(TurnContextItem), WorldState(WorldStateItem), + SamplingBoundary(SamplingBoundaryEvent), EventMsg(EventMsg), } diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index 1154d36eeecc..981d400b5a92 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1173,6 +1173,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result {} + RolloutItem::SamplingBoundary(_) => {} RolloutItem::TurnContext(_) => { // Not included in `head`; skip. } @@ -1239,6 +1240,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result None, }) && let Some(builder) = builder_from_session_meta(session_meta, rollout_path) { @@ -130,6 +131,7 @@ pub async fn extract_metadata_from_rollout( | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }), parse_errors, diff --git a/codex-rs/rollout/src/persistence_metrics.rs b/codex-rs/rollout/src/persistence_metrics.rs index 8c2a7e18d238..a1dc6931957d 100644 --- a/codex-rs/rollout/src/persistence_metrics.rs +++ b/codex-rs/rollout/src/persistence_metrics.rs @@ -233,6 +233,7 @@ fn rollout_item_type(item: &RolloutItem) -> String { RolloutItem::Compacted(_) => "compacted".to_string(), RolloutItem::TurnContext(_) => "turn_context".to_string(), RolloutItem::WorldState(_) => "world_state".to_string(), + RolloutItem::SamplingBoundary(_) => "sampling_boundary".to_string(), RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => { format!("event.item_completed.{}", turn_item_type(&event.item)) } diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 2390a0fc61a1..c365b5a35d1c 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -13,7 +13,8 @@ pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool { RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) - | RolloutItem::SessionMeta(_) => true, + | RolloutItem::SessionMeta(_) + | RolloutItem::SamplingBoundary(_) => true, } } diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 87e97997a245..65c18dd0e444 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -1930,6 +1930,7 @@ async fn resume_candidate_matches_cwd( | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) { diff --git a/codex-rs/rollout/src/search.rs b/codex-rs/rollout/src/search.rs index 58b46887a98f..1ef66709a522 100644 --- a/codex-rs/rollout/src/search.rs +++ b/codex-rs/rollout/src/search.rs @@ -286,7 +286,8 @@ fn conversation_text_from_item(item: &RolloutItem) -> Option { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => None, + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => None, } } diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 48e308f627e1..4922192ef2d4 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -26,6 +26,7 @@ pub fn apply_rollout_item( | RolloutItem::InterAgentCommunicationMetadata { .. } => {} RolloutItem::Compacted(_) => {} RolloutItem::WorldState(_) => {} + RolloutItem::SamplingBoundary(_) => {} } if metadata.model_provider.is_empty() { metadata.model_provider = default_provider.to_string(); @@ -44,7 +45,8 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => false, + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => false, } } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index c80b2b671679..2a50e40de085 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1245,6 +1245,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option { | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) | RolloutItem::EventMsg(_) => None, }) } diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 1057715a4a58..eeb80c18a60f 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -287,7 +287,8 @@ impl ThreadMetadataSync { | RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) - | RolloutItem::WorldState(_) => {} + | RolloutItem::WorldState(_) + | RolloutItem::SamplingBoundary(_) => {} } } Some(update) diff --git a/patches/BUILD.bazel b/patches/BUILD.bazel index ed1413717007..d388e278fed5 100644 --- a/patches/BUILD.bazel +++ b/patches/BUILD.bazel @@ -4,6 +4,7 @@ exports_files([ "aws-lc-sys_windows_msvc_prebuilt_nasm.patch", "aws-lc-sys_windows_msvc_memcmp_probe.patch", "bzip2_windows_stack_args.patch", + "cxx-build_bazel_out_dir_shared.patch", "llvm_rusty_v8_custom_libcxx.patch", "llvm_windows_arm64_powl.patch", "llvm_windows_mingw_compat.patch", @@ -23,6 +24,7 @@ exports_files([ "v8_bazel_rules.patch", "v8_module_deps.patch", "v8_source_portability.patch", + "webrtc-sys-build_bazel_out_dir_scratch.patch", "webrtc-sys_hermetic_darwin_sysroot.patch", "windows-link.patch", "xz_windows_stack_args.patch", diff --git a/patches/cxx-build_bazel_out_dir_shared.patch b/patches/cxx-build_bazel_out_dir_shared.patch new file mode 100644 index 000000000000..acbe5c657a03 --- /dev/null +++ b/patches/cxx-build_bazel_out_dir_shared.patch @@ -0,0 +1,19 @@ +diff --git a/src/lib.rs b/src/lib.rs +index 3928062..7dfb803 100644 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -173,7 +173,13 @@ impl Project { + + let shared_dir = match target::find_target_dir(&out_dir) { + TargetDir::Path(target_dir) => target_dir.join("cxxbridge"), +- TargetDir::Unknown => scratch::path("cxxbridge"), ++ TargetDir::Unknown => { ++ if env::var_os("CXX_BUILD_BAZEL_OUT_DIR_SHARED").is_some() { ++ out_dir.join("cxxbridge-shared") ++ } else { ++ scratch::path("cxxbridge") ++ } ++ } + }; + + Ok(Project { diff --git a/patches/webrtc-sys-build_bazel_out_dir_scratch.patch b/patches/webrtc-sys-build_bazel_out_dir_scratch.patch new file mode 100644 index 000000000000..e83b5a47bf0a --- /dev/null +++ b/patches/webrtc-sys-build_bazel_out_dir_scratch.patch @@ -0,0 +1,41 @@ +diff --git a/src/lib.rs b/src/lib.rs +index 2acdd60..4b083e1 100644 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -87,7 +87,7 @@ pub fn use_debug() -> bool { + /// across multiple crates without dependencies constraints + /// This also has the benefit of not re-downloading the binaries for each crate + pub fn prebuilt_dir() -> path::PathBuf { +- let target_dir = scratch::path(SCRATH_PATH); ++ let target_dir = scratch_dir(); + path::Path::new(&target_dir).join(format!( + "livekit/{}-{}/{}", + webrtc_triple(), +@@ -98,6 +98,18 @@ pub fn prebuilt_dir() -> path::PathBuf { + )) + } + ++fn scratch_dir() -> path::PathBuf { ++ if env::var_os("WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH").is_some() { ++ return path::PathBuf::from( ++ env::var_os("OUT_DIR") ++ .expect("OUT_DIR must be set when WEBRTC_SYS_BAZEL_OUT_DIR_SCRATCH is set"), ++ ) ++ .join(SCRATH_PATH); ++ } ++ ++ scratch::path(SCRATH_PATH) ++} ++ + pub fn download_url() -> String { + format!( + "https://github.com/livekit/rust-sdks/releases/download/{}/{}.zip", +@@ -197,7 +208,7 @@ pub fn configure_jni_symbols() -> Result<()> { + } + + pub fn download_webrtc() -> Result<()> { +- let dir = scratch::path(SCRATH_PATH); ++ let dir = scratch_dir(); + // temporary fix to avoid github workflow issue + fs::create_dir_all(&dir).context("Failed to create scratch_path")?; + let flock = File::create(dir.join(".lock"))