diff --git a/crates/builtins/src/pluginfs/mod.rs b/crates/builtins/src/pluginfs/mod.rs index 979d4ccf..8c2987a7 100644 --- a/crates/builtins/src/pluginfs/mod.rs +++ b/crates/builtins/src/pluginfs/mod.rs @@ -1283,6 +1283,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: root, + events: None, } } diff --git a/crates/builtins/src/pluginhostbin/mod.rs b/crates/builtins/src/pluginhostbin/mod.rs index 04a5e5ed..df65debe 100644 --- a/crates/builtins/src/pluginhostbin/mod.rs +++ b/crates/builtins/src/pluginhostbin/mod.rs @@ -328,6 +328,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox_dir.to_path_buf(), + events: None, } } diff --git a/crates/builtins/src/plugintextfile/mod.rs b/crates/builtins/src/plugintextfile/mod.rs index 1addb26a..3ef2e648 100644 --- a/crates/builtins/src/plugintextfile/mod.rs +++ b/crates/builtins/src/plugintextfile/mod.rs @@ -202,6 +202,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox_dir.to_path_buf(), + events: None, } } diff --git a/crates/core/src/events.rs b/crates/core/src/events.rs index b0235f59..e6343999 100644 --- a/crates/core/src/events.rs +++ b/crates/core/src/events.rs @@ -52,6 +52,19 @@ pub enum BuildEventKind { addr: String, error: Option, }, + /// Start of building the target's sandbox: creating the sandbox dir and + /// materializing every declared input into it (unpack / hardlink-stage / + /// FUSE-slot register) before the subprocess spawns. Emitted by the driver + /// bridge, which owns the create step. Surfaced as one op in the per-target + /// timeline and marked slow if it runs long (large inputs, cold stage). + /// Paired with `SandboxCreateEnd` (fires on completion, `?`, or cancel). + SandboxCreateStart { + addr: String, + }, + SandboxCreateEnd { + addr: String, + error: Option, + }, LocalCacheHit { addr: String, }, @@ -131,3 +144,162 @@ pub fn now_unix_ms() -> u64 { .map(|d| d.as_millis() as u64) .unwrap_or(0) } + +/// Stamp + send one event on `tx`. A closed receiver (consumer gone, e.g. TUI +/// shut down) is expected — events are best-effort, so the send result is +/// dropped intentionally. +fn send_stamped(tx: &EventSender, kind: BuildEventKind) { + drop(tx.send(BuildEvent { + at_unix_ms: now_unix_ms(), + kind, + })); +} + +/// Drop-guard so the `*End` event fires on early-return (`?`) **and** on +/// cancellation (the awaited future is dropped mid-flight). Once armed, it emits +/// exactly one end event when dropped. +struct EndGuard { + tx: Option, + make_end: Option) -> BuildEventKind + Send>>, + error: Option, +} + +impl Drop for EndGuard { + fn drop(&mut self) { + if let (Some(tx), Some(make_end)) = (self.tx.take(), self.make_end.take()) { + send_stamped(&tx, make_end(self.error.take())); + } + } +} + +/// Emit `start`, run `fut`, then emit `make_end(error)` on completion, +/// early-return (`?`), or cancellation — given a raw `EventSender` rather than +/// the engine's `RequestState`. Used by layers below the engine (the driver +/// bridge) that hold an `Option` plumbed through the request. A +/// `None` sender makes this a transparent pass-through (no events, no overhead). +/// +/// The end event is produced by an internal drop-guard armed before the await, +/// so it fires even if `fut` is cancelled mid-await. `at_unix_ms` is stamped on +/// both the start and end events. +pub async fn emit_scope_tx( + events: Option, + start: BuildEventKind, + make_end: impl FnOnce(Option) -> BuildEventKind + Send + 'static, + fut: impl std::future::Future>, +) -> anyhow::Result { + if let Some(tx) = &events { + send_stamped(tx, start); + } + let mut guard = EndGuard { + tx: events, + make_end: Some(Box::new(make_end)), + error: None, + }; + let out = fut.await; // guard still armed if this is cancelled mid-await + if let Err(e) = &out { + guard.error = Some(format!("{e:#}")); + } + out // guard drops here → emits *End +} + +#[cfg(test)] +mod tests { + use super::*; + + fn kinds(rx: &mut EventReceiver) -> Vec { + let mut out = Vec::new(); + while let Ok(ev) = rx.try_recv() { + out.push(ev.kind); + } + out + } + + #[tokio::test] + async fn emit_scope_tx_emits_start_then_end_on_success() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let out: anyhow::Result = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + async { Ok(7) }, + ) + .await; + assert_eq!(out.unwrap(), 7); + match kinds(&mut rx).as_slice() { + [ + BuildEventKind::SandboxCreateStart { addr: a }, + BuildEventKind::SandboxCreateEnd { + addr: b, + error: None, + }, + ] => { + assert_eq!(a, "//a:b"); + assert_eq!(b, "//a:b"); + } + other => panic!("unexpected events: {other:?}"), + } + } + + #[tokio::test] + async fn emit_scope_tx_captures_error_in_end_event() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let out: anyhow::Result<()> = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + async { Err(anyhow::anyhow!("boom")) }, + ) + .await; + assert!(out.is_err()); + let end = kinds(&mut rx).pop().expect("end event"); + match end { + BuildEventKind::SandboxCreateEnd { + error: Some(msg), .. + } => assert!(msg.contains("boom"), "{msg}"), + other => panic!("expected end-with-error, got {other:?}"), + } + } + + #[tokio::test] + async fn emit_scope_tx_emits_end_on_cancellation() { + // Dropping the future mid-await must still fire the end event via the + // armed drop-guard — the slow-sandbox case where the build is cancelled. + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let fut = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + std::future::pending::>(), + ); + // Poll once to emit Start + arm the guard, then drop without completing. + let mut boxed = Box::pin(fut); + let _ = futures::poll!(boxed.as_mut()); + drop(boxed); + let observed = kinds(&mut rx); + assert!( + matches!( + observed.as_slice(), + [ + BuildEventKind::SandboxCreateStart { .. }, + BuildEventKind::SandboxCreateEnd { error: None, .. }, + ] + ), + "expected start+end on cancel, got {observed:?}" + ); + } +} diff --git a/crates/driver-bridge/src/bridge.rs b/crates/driver-bridge/src/bridge.rs index 9d2bb174..47ff4f85 100644 --- a/crates/driver-bridge/src/bridge.rs +++ b/crates/driver-bridge/src/bridge.rs @@ -288,6 +288,117 @@ mod shell_fallback_tests { } } + struct RunOkDriver; + + #[async_trait] + impl ManagedDriver for RunOkDriver { + fn config(&self, _: ConfigRequest) -> anyhow::Result { + Ok(ConfigResponse { + name: "runok".to_string(), + }) + } + fn schema(&self) -> hplugin::driver::DriverSchema { + hplugin::driver::DriverSchema::default() + } + async fn parse( + &self, + _: ParseRequest, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + unimplemented!() + } + async fn apply_transitive( + &self, + req: ApplyTransitiveRequest, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + Ok(ApplyTransitiveResponse { + target_def: req.target_def, + }) + } + async fn run<'a, 'io>( + &self, + _: ManagedRunRequest<'a, 'io>, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + Ok(ManagedRunResponse { artifacts: vec![] }) + } + } + + #[tokio::test] + async fn run_emits_sandbox_create_start_and_end() -> anyhow::Result<()> { + // With an EventSender attached, the OS bridge brackets the sandbox build + // with SandboxCreate{Start,End} so the TUI can render it as a step. + let mut config: std::collections::HashMap = + std::collections::HashMap::new(); + config.insert("run".to_string(), hcore::htvalue::Value::List(vec![])); + let fallback = Arc::new(ShellFallback { + driver: Arc::new(RunOkDriver), + spec_template: Arc::new(TargetSpec { + addr: Default::default(), + driver: "exec".to_string(), + config, + labels: vec![], + transitive: Default::default(), + }), + }); + let bridge = ManagedDriverBridge::new_os_for_test_with_shell_fallback( + Box::new(RunOkDriver), + fallback, + ); + + let ctoken = StdCancellationToken::new(); + let tmp = tempfile::tempdir()?; + let sandbox = tmp.path().join("sandbox"); + fs::create_dir_all(&sandbox)?; + + let target_def = TargetDef { + addr: Default::default(), + labels: vec![], + raw_def: Arc::new(()), + inputs: vec![], + outputs: vec![], + support_files: vec![], + cache: hplugin::driver::targetdef::CacheConfig::off(), + pty: false, + hash: vec![], + transparent: false, + }; + let request_id = "sandbox-create-test".to_string(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let req = RunRequest { + request_id: &request_id, + target: &target_def, + tree_root_path: tmp.path().to_path_buf(), + inputs: vec![], + hashin: "", + stdin: None, + stdout: None, + stderr: None, + sandbox_dir: sandbox.clone(), + events: Some(tx), + }; + + bridge.run(req, &ctoken).await?; + + let mut kinds = Vec::new(); + while let Ok(ev) = rx.try_recv() { + kinds.push(ev.kind); + } + let expected = target_def.addr.format(); + assert!( + matches!( + kinds.as_slice(), + [ + hcore::events::BuildEventKind::SandboxCreateStart { addr: a }, + hcore::events::BuildEventKind::SandboxCreateEnd { addr: b, error: None }, + ] if *a == expected && *b == expected + ), + "expected sandbox create start+end for {expected}, got {kinds:?}", + ); + Ok(()) + } + #[tokio::test] async fn run_shell_dispatches_to_fallback_when_driver_does_not_support() -> anyhow::Result<()> { let parse_called = Arc::new(AtomicBool::new(false)); @@ -341,6 +452,7 @@ mod shell_fallback_tests { stdout: None, stderr: None, sandbox_dir: sandbox.clone(), + events: None, }; bridge.run_shell(req, &ctoken).await?; diff --git a/crates/driver-bridge/src/driver_managed_fuse.rs b/crates/driver-bridge/src/driver_managed_fuse.rs index 0c0fb243..212fd121 100644 --- a/crates/driver-bridge/src/driver_managed_fuse.rs +++ b/crates/driver-bridge/src/driver_managed_fuse.rs @@ -54,77 +54,109 @@ impl ManagedDriverFuse { hcore::fsutil::remove_dir_all(&upper_cleanup_dir) })); - let ws_dir = sandbox_dir.join("ws"); - fs::create_dir_all(&ws_dir).with_context(|| format!("create ws dir {:?}", ws_dir))?; + // Build the sandbox (create dirs + register FUSE slots / unpack inputs) + // under a SandboxCreate scope so it renders as a per-target op in the TUI + // and is flagged slow when it runs long. The scope covers only the build + // — not the subprocess run that follows (that is the Execute op). + let events = req.events.clone(); + let addr = req.target.addr.format(); + let pkg = req.target.addr.package.as_str().to_owned(); + let inputs_taken = std::mem::take(&mut req.inputs); + let fs_shared = self.fs.clone(); + let fuse_lower = self.fuse_lower.clone(); + let sandbox_dir_scope = sandbox_dir.clone(); + let (inputs, ws_dir, sandbox_pkg_dir, mut slot_guards) = hcore::events::emit_scope_tx( + events, + hcore::events::BuildEventKind::SandboxCreateStart { addr: addr.clone() }, + move |error| hcore::events::BuildEventKind::SandboxCreateEnd { addr, error }, + async move { + let sandbox_dir = sandbox_dir_scope; + let ws_dir = sandbox_dir.join("ws"); + fs::create_dir_all(&ws_dir) + .with_context(|| format!("create ws dir {:?}", ws_dir))?; - let list_dir = sandbox_dir.join("list"); - fs::create_dir_all(&list_dir).with_context(|| format!("create list dir {:?}", list_dir))?; + let list_dir = sandbox_dir.join("list"); + fs::create_dir_all(&list_dir) + .with_context(|| format!("create list dir {:?}", list_dir))?; - let mut groups: BTreeMap> = BTreeMap::new(); - for input in std::mem::take(&mut req.inputs) { - let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); - groups.entry(unpack_root).or_default().push(input); - } + let mut groups: BTreeMap> = BTreeMap::new(); + for input in inputs_taken { + let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); + groups.entry(unpack_root).or_default().push(input); + } - let mut slot_guards: Vec = Vec::new(); - let mut inputs: Vec = Vec::new(); + let mut slot_guards: Vec = Vec::new(); + let mut inputs: Vec = Vec::new(); - for (unpack_root, group) in groups { - fs::create_dir_all(&unpack_root) - .with_context(|| format!("create unpack root {:?}", unpack_root))?; - match try_register_slot(&self.fs, &self.fuse_lower, &unpack_root, &list_dir, &group)? { - Some(guard) => { - slot_guards.push(guard); - // Slot registered; inputs served via layers — no per-input unpack. - for input in group { - let list_path = list_path_for(&input, &list_dir); - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); + for (unpack_root, group) in groups { + fs::create_dir_all(&unpack_root) + .with_context(|| format!("create unpack root {:?}", unpack_root))?; + match try_register_slot( + &fs_shared, + &fuse_lower, + &unpack_root, + &list_dir, + &group, + )? { + Some(guard) => { + slot_guards.push(guard); + // Slot registered; inputs served via layers — no per-input unpack. + for input in group { + let list_path = list_path_for(&input, &list_dir); + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); + } + } + None => { + // Fallback (e.g. input lacks seekable reader): unpack + // into the FUSE upper layer just like OS mode. + for input in group { + let list_path = list_path_for(&input, &list_dir); + let filters = input.filters.clone(); + let predicate: Option<&dyn Fn(&Path) -> bool> = if filters + .is_empty() + { + None + } else { + Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) + }; + hartifactcontent::unpack::unpack( + input.artifact.content.as_ref(), + unpack_root.as_path(), + list_path.as_deref(), + predicate, + ) + .with_context(|| { + format!( + "unpack input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); + } + } } } - None => { - // Fallback (e.g. input lacks seekable reader): unpack - // into the FUSE upper layer just like OS mode. - for input in group { - let list_path = list_path_for(&input, &list_dir); - let filters = input.filters.clone(); - let predicate: Option<&dyn Fn(&Path) -> bool> = if filters.is_empty() { - None - } else { - Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) - }; - hartifactcontent::unpack::unpack( - input.artifact.content.as_ref(), - unpack_root.as_path(), - list_path.as_deref(), - predicate, - ) - .with_context(|| { - format!( - "unpack input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); - } - } - } - } - let sandbox_pkg_dir = ws_dir.join(req.target.addr.package.as_str()); - fs::create_dir_all(&sandbox_pkg_dir) - .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + let sandbox_pkg_dir = ws_dir.join(&pkg); + fs::create_dir_all(&sandbox_pkg_dir) + .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + + write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; - write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; + Ok((inputs, ws_dir, sandbox_pkg_dir, slot_guards)) + }, + ) + .await?; let target = req.target; let hashin = req.hashin; diff --git a/crates/driver-support/src/driver_managed.rs b/crates/driver-support/src/driver_managed.rs index ce052951..d46f90c3 100644 --- a/crates/driver-support/src/driver_managed.rs +++ b/crates/driver-support/src/driver_managed.rs @@ -187,6 +187,7 @@ async fn run_shell_fallback<'a, 'io>( stdout, stderr, sandbox_dir: req_sandbox_dir, + events, } = request; let mut synthetic = (*shell_fallback.spec_template).clone(); @@ -221,6 +222,7 @@ async fn run_shell_fallback<'a, 'io>( stdout, stderr, sandbox_dir: req_sandbox_dir, + events, }; let new_mreq = ManagedRunRequest { request: new_req, diff --git a/crates/driver-support/src/driver_managed_os.rs b/crates/driver-support/src/driver_managed_os.rs index 4935e664..0eb76d8d 100644 --- a/crates/driver-support/src/driver_managed_os.rs +++ b/crates/driver-support/src/driver_managed_os.rs @@ -51,82 +51,111 @@ impl ManagedDriverOs { Some(Box::new(move || { hcore::fsutil::remove_dir_all(&cleanup_dir) })); - let ws_dir = sandbox_dir.join("ws"); - fs::create_dir_all(&ws_dir).with_context(|| format!("create ws dir {:?}", ws_dir))?; - let list_dir = sandbox_dir.join("list"); - fs::create_dir_all(&list_dir).with_context(|| format!("create list dir {:?}", list_dir))?; + // Build the sandbox (create dirs + materialize every input) under a + // SandboxCreate scope so it renders as a per-target op in the TUI and is + // flagged slow when it runs long. The scope covers only the build — not + // the subprocess run that follows (that is the Execute op). + let events = req.events.clone(); + let addr = req.target.addr.format(); + let pkg = req.target.addr.package.as_str().to_owned(); + let inputs_taken = std::mem::take(&mut req.inputs); + let stage_dir = self.stage_dir.clone(); + let sandbox_dir_scope = sandbox_dir.clone(); + let (inputs, ws_dir, sandbox_pkg_dir) = hcore::events::emit_scope_tx( + events, + hcore::events::BuildEventKind::SandboxCreateStart { addr: addr.clone() }, + move |error| hcore::events::BuildEventKind::SandboxCreateEnd { addr, error }, + async move { + let sandbox_dir = sandbox_dir_scope; + let ws_dir = sandbox_dir.join("ws"); + fs::create_dir_all(&ws_dir) + .with_context(|| format!("create ws dir {:?}", ws_dir))?; - let mut groups: BTreeMap> = BTreeMap::new(); - for input in std::mem::take(&mut req.inputs) { - let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); - groups.entry(unpack_root).or_default().push(input); - } + let list_dir = sandbox_dir.join("list"); + fs::create_dir_all(&list_dir) + .with_context(|| format!("create list dir {:?}", list_dir))?; - let mut inputs: Vec = Vec::new(); - for (unpack_root, group) in groups { - fs::create_dir_all(&unpack_root) - .with_context(|| format!("create unpack root {:?}", unpack_root))?; - for input in group { - let list_path = list_path_for(&input, &list_dir); - match self.stage_dir.as_deref() { - Some(stage_dir) if crate::stage::is_read_only(&input.annotations) => { - crate::stage::stage_and_link( - input.artifact.content.as_ref(), - stage_dir, - &input.source_addr.format(), - unpack_root.as_path(), - list_path.as_deref(), - &input.filters, - crate::stage::is_per_file(&input.annotations), - ctoken, - ) - .await - .with_context(|| { - format!( - "stage read-only input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; - } - _ => { - let filters = input.filters.clone(); - let predicate: Option<&dyn Fn(&Path) -> bool> = if filters.is_empty() { - None - } else { - Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) - }; - hartifactcontent::unpack::unpack( - input.artifact.content.as_ref(), - unpack_root.as_path(), - list_path.as_deref(), - predicate, - ) - .with_context(|| { - format!( - "unpack input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; + let mut groups: BTreeMap> = BTreeMap::new(); + for input in inputs_taken { + let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); + groups.entry(unpack_root).or_default().push(input); + } + + let mut inputs: Vec = Vec::new(); + for (unpack_root, group) in groups { + fs::create_dir_all(&unpack_root) + .with_context(|| format!("create unpack root {:?}", unpack_root))?; + for input in group { + let list_path = list_path_for(&input, &list_dir); + match stage_dir.as_deref() { + Some(stage_dir) + if crate::stage::is_read_only(&input.annotations) => + { + crate::stage::stage_and_link( + input.artifact.content.as_ref(), + stage_dir, + &input.source_addr.format(), + unpack_root.as_path(), + list_path.as_deref(), + &input.filters, + crate::stage::is_per_file(&input.annotations), + ctoken, + ) + .await + .with_context(|| { + format!( + "stage read-only input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + } + _ => { + let filters = input.filters.clone(); + let predicate: Option<&dyn Fn(&Path) -> bool> = + if filters.is_empty() { + None + } else { + Some(&|rel: &Path| { + filters.iter().any(|f| Path::new(f) == rel) + }) + }; + hartifactcontent::unpack::unpack( + input.artifact.content.as_ref(), + unpack_root.as_path(), + list_path.as_deref(), + predicate, + ) + .with_context(|| { + format!( + "unpack input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + } + } + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); } } - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); - } - } - let sandbox_pkg_dir = ws_dir.join(req.target.addr.package.as_str()); - fs::create_dir_all(&sandbox_pkg_dir) - .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + let sandbox_pkg_dir = ws_dir.join(&pkg); + fs::create_dir_all(&sandbox_pkg_dir) + .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + + write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; - write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; + Ok((inputs, ws_dir, sandbox_pkg_dir)) + }, + ) + .await?; let target = req.target; let hashin = req.hashin; diff --git a/crates/engine/src/engine/execute.rs b/crates/engine/src/engine/execute.rs index a0b39cce..a8635e59 100644 --- a/crates/engine/src/engine/execute.rs +++ b/crates/engine/src/engine/execute.rs @@ -121,6 +121,7 @@ impl Engine { stdout, stderr, sandbox_dir, + events: rs.events_sender(), }; let res = if shell { driver.driver.run_shell(req, rs.ctoken()).await? diff --git a/crates/plugin-exec/src/pluginexec/mod.rs b/crates/plugin-exec/src/pluginexec/mod.rs index 810dbd1b..2a26db82 100644 --- a/crates/plugin-exec/src/pluginexec/mod.rs +++ b/crates/plugin-exec/src/pluginexec/mod.rs @@ -1640,6 +1640,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let _res = driver.run(make_req(req), &ctoken).await?; @@ -1692,6 +1693,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; // Use a timeout to detect the hang @@ -1750,6 +1752,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let run_fut = driver.run(make_req(req), &ctoken); @@ -1815,6 +1818,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let run_fut = driver.run(make_req(req), &ctoken); @@ -1879,6 +1883,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let res = tokio::time::timeout( @@ -1939,6 +1944,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; Ok(String::from_utf8(stdout)?.trim().to_string()) @@ -2481,6 +2487,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2527,6 +2534,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2603,6 +2611,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2754,6 +2763,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox.clone(), + events: None, }; os.run_inner(req, &ctoken, false).await?; @@ -2850,6 +2860,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2921,6 +2932,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; @@ -3004,6 +3016,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -3103,6 +3116,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -3189,6 +3203,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; diff --git a/crates/plugin-sdk/src/serve.rs b/crates/plugin-sdk/src/serve.rs index fab07710..167d2ccf 100644 --- a/crates/plugin-sdk/src/serve.rs +++ b/crates/plugin-sdk/src/serve.rs @@ -870,6 +870,7 @@ async fn run_once( stdout: None, stderr: None, sandbox_dir: sandbox_dir.clone(), + events: None, }; let mrr = ManagedRunRequest { request: rr, diff --git a/crates/plugin/src/driver.rs b/crates/plugin/src/driver.rs index b6d6573d..e7754f65 100644 --- a/crates/plugin/src/driver.rs +++ b/crates/plugin/src/driver.rs @@ -870,6 +870,11 @@ pub struct RunRequest<'a, 'io> { pub stdout: Option<&'io mut (dyn tokio::io::AsyncWrite + Send + Sync + Unpin)>, pub stderr: Option<&'io mut (dyn tokio::io::AsyncWrite + Send + Sync + Unpin)>, pub sandbox_dir: std::path::PathBuf, + /// Build-progress event sink, plumbed from the engine's per-request state. + /// The driver bridge emits `SandboxCreate{Start,End}` around the sandbox + /// build step so it renders as a per-target op in the TUI. `None` when no + /// consumer is attached (e.g. tests), making the emit a no-op. + pub events: Option, } /// Cleanup closure a driver returns for the engine to run after `cache_locally`. /// The FUSE/OS sandbox layers each supply their own teardown; the engine's diff --git a/crates/tui/src/tui/progress.rs b/crates/tui/src/tui/progress.rs index c188fec3..2c286cf2 100644 --- a/crates/tui/src/tui/progress.rs +++ b/crates/tui/src/tui/progress.rs @@ -321,6 +321,7 @@ impl HeaderItem { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum Op { RemoteCacheRead, + SandboxCreate, Execute, LocalCacheWrite, RemoteCacheWrite, @@ -333,6 +334,7 @@ impl Op { fn icon(self) -> char { match self { Op::RemoteCacheRead => '↓', + Op::SandboxCreate => '⊞', Op::Execute => '▶', Op::LocalCacheWrite => '⊕', Op::RemoteCacheWrite => '↑', @@ -340,13 +342,14 @@ impl Op { } /// Pipeline ordinal for stable left-to-right ordering of the breakdown: - /// remote download → execute → local-cache write → remote upload. + /// remote download → sandbox build → execute → local-cache write → remote upload. fn order(self) -> u8 { match self { Op::RemoteCacheRead => 0, - Op::Execute => 1, - Op::LocalCacheWrite => 2, - Op::RemoteCacheWrite => 3, + Op::SandboxCreate => 1, + Op::Execute => 2, + Op::LocalCacheWrite => 3, + Op::RemoteCacheWrite => 4, } } } @@ -380,6 +383,12 @@ struct OpTimeline { /// into the shared per-target timeline; a new op needs one arm here. fn event_op_boundary(kind: &BuildEventKind) -> Option<(&str, Op, Boundary)> { match kind { + BuildEventKind::SandboxCreateStart { addr } => { + Some((addr, Op::SandboxCreate, Boundary::Start)) + } + BuildEventKind::SandboxCreateEnd { addr, .. } => { + Some((addr, Op::SandboxCreate, Boundary::End)) + } BuildEventKind::ExecuteStart { addr, .. } => Some((addr, Op::Execute, Boundary::Start)), BuildEventKind::ExecuteEnd { addr, .. } => Some((addr, Op::Execute, Boundary::End)), BuildEventKind::LocalCacheWriteStart { addr } => { @@ -561,7 +570,9 @@ impl BuildState { | BuildEventKind::RemoteCacheWriteStart { .. } | BuildEventKind::RemoteCacheWriteEnd { .. } | BuildEventKind::LocalCacheWriteStart { .. } - | BuildEventKind::LocalCacheWriteEnd { .. } => {} + | BuildEventKind::LocalCacheWriteEnd { .. } + | BuildEventKind::SandboxCreateStart { .. } + | BuildEventKind::SandboxCreateEnd { .. } => {} // GC progress is tracked by GcHeader, not the build counters. The // elapsed-clock anchor at the top of `apply` still runs, so the // clock works during a gc sweep. @@ -1656,6 +1667,58 @@ mod tests { ); } + #[test] + fn op_timeline_records_sandbox_create_before_execute() { + // SandboxCreate runs 0→4s (completed), then Execute opens at 4s and is + // still live at now=11s. The breakdown carries both, ordered with the + // sandbox build ahead of execute. + let mut s = BuildState::new(); + s.apply(&ev( + 0, + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + )); + s.apply(&ev( + 4_000, + BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error: None, + }, + )); + s.apply(&ev(4_000, execute_start("//a:b"))); + + let long = s.long_running(11_000, 5_000); + assert_eq!(long.len(), 1); + assert_eq!(long[0].0, "//a:b"); + assert_eq!(long[0].1, 7_000); // active Execute elapsed + assert_eq!( + long[0].2, + vec![(Op::SandboxCreate, 4_000), (Op::Execute, 7_000)] + ); + } + + #[test] + fn long_slow_sandbox_create_surfaces_on_its_own() { + // A sandbox build that stays open past the threshold shows as a slow row + // with the SandboxCreate icon, before any execute starts. + let mut s = BuildState::new(); + s.apply(&ev( + 0, + BuildEventKind::SandboxCreateStart { + addr: "//slow:sbx".into(), + }, + )); + let long = s.long_running(7_000, 5_000); + assert_eq!(long.len(), 1); + assert_eq!(long[0].0, "//slow:sbx"); + let line = format!("{}", s.slow_rows(7_000)[0]); + assert!( + line.contains(&format!("({} ", Op::SandboxCreate.icon())), + "{line}" + ); + } + #[test] fn op_timeline_omits_sub_second_ops() { // A 500ms Execute is below the 1s breakdown floor and is dropped; only the