From da729a0b85e1024ea725e6de1781a307ded6d9c5 Mon Sep 17 00:00:00 2001 From: Raphael Vigee Date: Sat, 20 Jun 2026 14:17:21 +0200 Subject: [PATCH] feat(tui): track sandbox creation as a per-target step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bracket the sandbox build (create dirs + materialize every input: unpack / hardlink-stage / FUSE-slot register) with SandboxCreate{Start,End} events so it renders as its own per-target op in the TUI and is flagged slow when it runs long — distinct from the Execute op that covers the subprocess run. The driver bridge owns the create step, so it emits the events. It sits below the engine, so plumb an Option through RunRequest (populated from RequestState) and add a sender-based emit_scope_tx to hcore::events (the engine's emit_scope needs RequestState). Both bridge paths (OS + FUSE) wrap their materialization in the scope; the guard fires End on success, `?`, or cancellation. TUI gets an Op::SandboxCreate variant ordered between remote-read and execute. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/builtins/src/pluginfs/mod.rs | 1 + crates/builtins/src/pluginhostbin/mod.rs | 1 + crates/builtins/src/plugintextfile/mod.rs | 1 + crates/core/src/events.rs | 172 ++++++++++++++++++ crates/driver-bridge/src/bridge.rs | 112 ++++++++++++ .../driver-bridge/src/driver_managed_fuse.rs | 158 +++++++++------- crates/driver-support/src/driver_managed.rs | 2 + .../driver-support/src/driver_managed_os.rs | 167 ++++++++++------- crates/engine/src/engine/execute.rs | 1 + crates/plugin-exec/src/pluginexec/mod.rs | 15 ++ crates/plugin-sdk/src/serve.rs | 1 + crates/plugin/src/driver.rs | 5 + crates/tui/src/tui/progress.rs | 73 +++++++- 13 files changed, 572 insertions(+), 137 deletions(-) diff --git a/crates/builtins/src/pluginfs/mod.rs b/crates/builtins/src/pluginfs/mod.rs index 979d4ccf..8c2987a7 100644 --- a/crates/builtins/src/pluginfs/mod.rs +++ b/crates/builtins/src/pluginfs/mod.rs @@ -1283,6 +1283,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: root, + events: None, } } diff --git a/crates/builtins/src/pluginhostbin/mod.rs b/crates/builtins/src/pluginhostbin/mod.rs index 04a5e5ed..df65debe 100644 --- a/crates/builtins/src/pluginhostbin/mod.rs +++ b/crates/builtins/src/pluginhostbin/mod.rs @@ -328,6 +328,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox_dir.to_path_buf(), + events: None, } } diff --git a/crates/builtins/src/plugintextfile/mod.rs b/crates/builtins/src/plugintextfile/mod.rs index 1addb26a..3ef2e648 100644 --- a/crates/builtins/src/plugintextfile/mod.rs +++ b/crates/builtins/src/plugintextfile/mod.rs @@ -202,6 +202,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox_dir.to_path_buf(), + events: None, } } diff --git a/crates/core/src/events.rs b/crates/core/src/events.rs index b0235f59..e6343999 100644 --- a/crates/core/src/events.rs +++ b/crates/core/src/events.rs @@ -52,6 +52,19 @@ pub enum BuildEventKind { addr: String, error: Option, }, + /// Start of building the target's sandbox: creating the sandbox dir and + /// materializing every declared input into it (unpack / hardlink-stage / + /// FUSE-slot register) before the subprocess spawns. Emitted by the driver + /// bridge, which owns the create step. Surfaced as one op in the per-target + /// timeline and marked slow if it runs long (large inputs, cold stage). + /// Paired with `SandboxCreateEnd` (fires on completion, `?`, or cancel). + SandboxCreateStart { + addr: String, + }, + SandboxCreateEnd { + addr: String, + error: Option, + }, LocalCacheHit { addr: String, }, @@ -131,3 +144,162 @@ pub fn now_unix_ms() -> u64 { .map(|d| d.as_millis() as u64) .unwrap_or(0) } + +/// Stamp + send one event on `tx`. A closed receiver (consumer gone, e.g. TUI +/// shut down) is expected — events are best-effort, so the send result is +/// dropped intentionally. +fn send_stamped(tx: &EventSender, kind: BuildEventKind) { + drop(tx.send(BuildEvent { + at_unix_ms: now_unix_ms(), + kind, + })); +} + +/// Drop-guard so the `*End` event fires on early-return (`?`) **and** on +/// cancellation (the awaited future is dropped mid-flight). Once armed, it emits +/// exactly one end event when dropped. +struct EndGuard { + tx: Option, + make_end: Option) -> BuildEventKind + Send>>, + error: Option, +} + +impl Drop for EndGuard { + fn drop(&mut self) { + if let (Some(tx), Some(make_end)) = (self.tx.take(), self.make_end.take()) { + send_stamped(&tx, make_end(self.error.take())); + } + } +} + +/// Emit `start`, run `fut`, then emit `make_end(error)` on completion, +/// early-return (`?`), or cancellation — given a raw `EventSender` rather than +/// the engine's `RequestState`. Used by layers below the engine (the driver +/// bridge) that hold an `Option` plumbed through the request. A +/// `None` sender makes this a transparent pass-through (no events, no overhead). +/// +/// The end event is produced by an internal drop-guard armed before the await, +/// so it fires even if `fut` is cancelled mid-await. `at_unix_ms` is stamped on +/// both the start and end events. +pub async fn emit_scope_tx( + events: Option, + start: BuildEventKind, + make_end: impl FnOnce(Option) -> BuildEventKind + Send + 'static, + fut: impl std::future::Future>, +) -> anyhow::Result { + if let Some(tx) = &events { + send_stamped(tx, start); + } + let mut guard = EndGuard { + tx: events, + make_end: Some(Box::new(make_end)), + error: None, + }; + let out = fut.await; // guard still armed if this is cancelled mid-await + if let Err(e) = &out { + guard.error = Some(format!("{e:#}")); + } + out // guard drops here → emits *End +} + +#[cfg(test)] +mod tests { + use super::*; + + fn kinds(rx: &mut EventReceiver) -> Vec { + let mut out = Vec::new(); + while let Ok(ev) = rx.try_recv() { + out.push(ev.kind); + } + out + } + + #[tokio::test] + async fn emit_scope_tx_emits_start_then_end_on_success() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let out: anyhow::Result = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + async { Ok(7) }, + ) + .await; + assert_eq!(out.unwrap(), 7); + match kinds(&mut rx).as_slice() { + [ + BuildEventKind::SandboxCreateStart { addr: a }, + BuildEventKind::SandboxCreateEnd { + addr: b, + error: None, + }, + ] => { + assert_eq!(a, "//a:b"); + assert_eq!(b, "//a:b"); + } + other => panic!("unexpected events: {other:?}"), + } + } + + #[tokio::test] + async fn emit_scope_tx_captures_error_in_end_event() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let out: anyhow::Result<()> = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + async { Err(anyhow::anyhow!("boom")) }, + ) + .await; + assert!(out.is_err()); + let end = kinds(&mut rx).pop().expect("end event"); + match end { + BuildEventKind::SandboxCreateEnd { + error: Some(msg), .. + } => assert!(msg.contains("boom"), "{msg}"), + other => panic!("expected end-with-error, got {other:?}"), + } + } + + #[tokio::test] + async fn emit_scope_tx_emits_end_on_cancellation() { + // Dropping the future mid-await must still fire the end event via the + // armed drop-guard — the slow-sandbox case where the build is cancelled. + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let fut = emit_scope_tx( + Some(tx), + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + |error| BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error, + }, + std::future::pending::>(), + ); + // Poll once to emit Start + arm the guard, then drop without completing. + let mut boxed = Box::pin(fut); + let _ = futures::poll!(boxed.as_mut()); + drop(boxed); + let observed = kinds(&mut rx); + assert!( + matches!( + observed.as_slice(), + [ + BuildEventKind::SandboxCreateStart { .. }, + BuildEventKind::SandboxCreateEnd { error: None, .. }, + ] + ), + "expected start+end on cancel, got {observed:?}" + ); + } +} diff --git a/crates/driver-bridge/src/bridge.rs b/crates/driver-bridge/src/bridge.rs index 9d2bb174..47ff4f85 100644 --- a/crates/driver-bridge/src/bridge.rs +++ b/crates/driver-bridge/src/bridge.rs @@ -288,6 +288,117 @@ mod shell_fallback_tests { } } + struct RunOkDriver; + + #[async_trait] + impl ManagedDriver for RunOkDriver { + fn config(&self, _: ConfigRequest) -> anyhow::Result { + Ok(ConfigResponse { + name: "runok".to_string(), + }) + } + fn schema(&self) -> hplugin::driver::DriverSchema { + hplugin::driver::DriverSchema::default() + } + async fn parse( + &self, + _: ParseRequest, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + unimplemented!() + } + async fn apply_transitive( + &self, + req: ApplyTransitiveRequest, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + Ok(ApplyTransitiveResponse { + target_def: req.target_def, + }) + } + async fn run<'a, 'io>( + &self, + _: ManagedRunRequest<'a, 'io>, + _: &(dyn Cancellable + Send + Sync), + ) -> anyhow::Result { + Ok(ManagedRunResponse { artifacts: vec![] }) + } + } + + #[tokio::test] + async fn run_emits_sandbox_create_start_and_end() -> anyhow::Result<()> { + // With an EventSender attached, the OS bridge brackets the sandbox build + // with SandboxCreate{Start,End} so the TUI can render it as a step. + let mut config: std::collections::HashMap = + std::collections::HashMap::new(); + config.insert("run".to_string(), hcore::htvalue::Value::List(vec![])); + let fallback = Arc::new(ShellFallback { + driver: Arc::new(RunOkDriver), + spec_template: Arc::new(TargetSpec { + addr: Default::default(), + driver: "exec".to_string(), + config, + labels: vec![], + transitive: Default::default(), + }), + }); + let bridge = ManagedDriverBridge::new_os_for_test_with_shell_fallback( + Box::new(RunOkDriver), + fallback, + ); + + let ctoken = StdCancellationToken::new(); + let tmp = tempfile::tempdir()?; + let sandbox = tmp.path().join("sandbox"); + fs::create_dir_all(&sandbox)?; + + let target_def = TargetDef { + addr: Default::default(), + labels: vec![], + raw_def: Arc::new(()), + inputs: vec![], + outputs: vec![], + support_files: vec![], + cache: hplugin::driver::targetdef::CacheConfig::off(), + pty: false, + hash: vec![], + transparent: false, + }; + let request_id = "sandbox-create-test".to_string(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let req = RunRequest { + request_id: &request_id, + target: &target_def, + tree_root_path: tmp.path().to_path_buf(), + inputs: vec![], + hashin: "", + stdin: None, + stdout: None, + stderr: None, + sandbox_dir: sandbox.clone(), + events: Some(tx), + }; + + bridge.run(req, &ctoken).await?; + + let mut kinds = Vec::new(); + while let Ok(ev) = rx.try_recv() { + kinds.push(ev.kind); + } + let expected = target_def.addr.format(); + assert!( + matches!( + kinds.as_slice(), + [ + hcore::events::BuildEventKind::SandboxCreateStart { addr: a }, + hcore::events::BuildEventKind::SandboxCreateEnd { addr: b, error: None }, + ] if *a == expected && *b == expected + ), + "expected sandbox create start+end for {expected}, got {kinds:?}", + ); + Ok(()) + } + #[tokio::test] async fn run_shell_dispatches_to_fallback_when_driver_does_not_support() -> anyhow::Result<()> { let parse_called = Arc::new(AtomicBool::new(false)); @@ -341,6 +452,7 @@ mod shell_fallback_tests { stdout: None, stderr: None, sandbox_dir: sandbox.clone(), + events: None, }; bridge.run_shell(req, &ctoken).await?; diff --git a/crates/driver-bridge/src/driver_managed_fuse.rs b/crates/driver-bridge/src/driver_managed_fuse.rs index 0c0fb243..212fd121 100644 --- a/crates/driver-bridge/src/driver_managed_fuse.rs +++ b/crates/driver-bridge/src/driver_managed_fuse.rs @@ -54,77 +54,109 @@ impl ManagedDriverFuse { hcore::fsutil::remove_dir_all(&upper_cleanup_dir) })); - let ws_dir = sandbox_dir.join("ws"); - fs::create_dir_all(&ws_dir).with_context(|| format!("create ws dir {:?}", ws_dir))?; + // Build the sandbox (create dirs + register FUSE slots / unpack inputs) + // under a SandboxCreate scope so it renders as a per-target op in the TUI + // and is flagged slow when it runs long. The scope covers only the build + // — not the subprocess run that follows (that is the Execute op). + let events = req.events.clone(); + let addr = req.target.addr.format(); + let pkg = req.target.addr.package.as_str().to_owned(); + let inputs_taken = std::mem::take(&mut req.inputs); + let fs_shared = self.fs.clone(); + let fuse_lower = self.fuse_lower.clone(); + let sandbox_dir_scope = sandbox_dir.clone(); + let (inputs, ws_dir, sandbox_pkg_dir, mut slot_guards) = hcore::events::emit_scope_tx( + events, + hcore::events::BuildEventKind::SandboxCreateStart { addr: addr.clone() }, + move |error| hcore::events::BuildEventKind::SandboxCreateEnd { addr, error }, + async move { + let sandbox_dir = sandbox_dir_scope; + let ws_dir = sandbox_dir.join("ws"); + fs::create_dir_all(&ws_dir) + .with_context(|| format!("create ws dir {:?}", ws_dir))?; - let list_dir = sandbox_dir.join("list"); - fs::create_dir_all(&list_dir).with_context(|| format!("create list dir {:?}", list_dir))?; + let list_dir = sandbox_dir.join("list"); + fs::create_dir_all(&list_dir) + .with_context(|| format!("create list dir {:?}", list_dir))?; - let mut groups: BTreeMap> = BTreeMap::new(); - for input in std::mem::take(&mut req.inputs) { - let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); - groups.entry(unpack_root).or_default().push(input); - } + let mut groups: BTreeMap> = BTreeMap::new(); + for input in inputs_taken { + let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); + groups.entry(unpack_root).or_default().push(input); + } - let mut slot_guards: Vec = Vec::new(); - let mut inputs: Vec = Vec::new(); + let mut slot_guards: Vec = Vec::new(); + let mut inputs: Vec = Vec::new(); - for (unpack_root, group) in groups { - fs::create_dir_all(&unpack_root) - .with_context(|| format!("create unpack root {:?}", unpack_root))?; - match try_register_slot(&self.fs, &self.fuse_lower, &unpack_root, &list_dir, &group)? { - Some(guard) => { - slot_guards.push(guard); - // Slot registered; inputs served via layers — no per-input unpack. - for input in group { - let list_path = list_path_for(&input, &list_dir); - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); + for (unpack_root, group) in groups { + fs::create_dir_all(&unpack_root) + .with_context(|| format!("create unpack root {:?}", unpack_root))?; + match try_register_slot( + &fs_shared, + &fuse_lower, + &unpack_root, + &list_dir, + &group, + )? { + Some(guard) => { + slot_guards.push(guard); + // Slot registered; inputs served via layers — no per-input unpack. + for input in group { + let list_path = list_path_for(&input, &list_dir); + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); + } + } + None => { + // Fallback (e.g. input lacks seekable reader): unpack + // into the FUSE upper layer just like OS mode. + for input in group { + let list_path = list_path_for(&input, &list_dir); + let filters = input.filters.clone(); + let predicate: Option<&dyn Fn(&Path) -> bool> = if filters + .is_empty() + { + None + } else { + Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) + }; + hartifactcontent::unpack::unpack( + input.artifact.content.as_ref(), + unpack_root.as_path(), + list_path.as_deref(), + predicate, + ) + .with_context(|| { + format!( + "unpack input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); + } + } } } - None => { - // Fallback (e.g. input lacks seekable reader): unpack - // into the FUSE upper layer just like OS mode. - for input in group { - let list_path = list_path_for(&input, &list_dir); - let filters = input.filters.clone(); - let predicate: Option<&dyn Fn(&Path) -> bool> = if filters.is_empty() { - None - } else { - Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) - }; - hartifactcontent::unpack::unpack( - input.artifact.content.as_ref(), - unpack_root.as_path(), - list_path.as_deref(), - predicate, - ) - .with_context(|| { - format!( - "unpack input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); - } - } - } - } - let sandbox_pkg_dir = ws_dir.join(req.target.addr.package.as_str()); - fs::create_dir_all(&sandbox_pkg_dir) - .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + let sandbox_pkg_dir = ws_dir.join(&pkg); + fs::create_dir_all(&sandbox_pkg_dir) + .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + + write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; - write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; + Ok((inputs, ws_dir, sandbox_pkg_dir, slot_guards)) + }, + ) + .await?; let target = req.target; let hashin = req.hashin; diff --git a/crates/driver-support/src/driver_managed.rs b/crates/driver-support/src/driver_managed.rs index ce052951..d46f90c3 100644 --- a/crates/driver-support/src/driver_managed.rs +++ b/crates/driver-support/src/driver_managed.rs @@ -187,6 +187,7 @@ async fn run_shell_fallback<'a, 'io>( stdout, stderr, sandbox_dir: req_sandbox_dir, + events, } = request; let mut synthetic = (*shell_fallback.spec_template).clone(); @@ -221,6 +222,7 @@ async fn run_shell_fallback<'a, 'io>( stdout, stderr, sandbox_dir: req_sandbox_dir, + events, }; let new_mreq = ManagedRunRequest { request: new_req, diff --git a/crates/driver-support/src/driver_managed_os.rs b/crates/driver-support/src/driver_managed_os.rs index 4935e664..0eb76d8d 100644 --- a/crates/driver-support/src/driver_managed_os.rs +++ b/crates/driver-support/src/driver_managed_os.rs @@ -51,82 +51,111 @@ impl ManagedDriverOs { Some(Box::new(move || { hcore::fsutil::remove_dir_all(&cleanup_dir) })); - let ws_dir = sandbox_dir.join("ws"); - fs::create_dir_all(&ws_dir).with_context(|| format!("create ws dir {:?}", ws_dir))?; - let list_dir = sandbox_dir.join("list"); - fs::create_dir_all(&list_dir).with_context(|| format!("create list dir {:?}", list_dir))?; + // Build the sandbox (create dirs + materialize every input) under a + // SandboxCreate scope so it renders as a per-target op in the TUI and is + // flagged slow when it runs long. The scope covers only the build — not + // the subprocess run that follows (that is the Execute op). + let events = req.events.clone(); + let addr = req.target.addr.format(); + let pkg = req.target.addr.package.as_str().to_owned(); + let inputs_taken = std::mem::take(&mut req.inputs); + let stage_dir = self.stage_dir.clone(); + let sandbox_dir_scope = sandbox_dir.clone(); + let (inputs, ws_dir, sandbox_pkg_dir) = hcore::events::emit_scope_tx( + events, + hcore::events::BuildEventKind::SandboxCreateStart { addr: addr.clone() }, + move |error| hcore::events::BuildEventKind::SandboxCreateEnd { addr, error }, + async move { + let sandbox_dir = sandbox_dir_scope; + let ws_dir = sandbox_dir.join("ws"); + fs::create_dir_all(&ws_dir) + .with_context(|| format!("create ws dir {:?}", ws_dir))?; - let mut groups: BTreeMap> = BTreeMap::new(); - for input in std::mem::take(&mut req.inputs) { - let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); - groups.entry(unpack_root).or_default().push(input); - } + let list_dir = sandbox_dir.join("list"); + fs::create_dir_all(&list_dir) + .with_context(|| format!("create list dir {:?}", list_dir))?; - let mut inputs: Vec = Vec::new(); - for (unpack_root, group) in groups { - fs::create_dir_all(&unpack_root) - .with_context(|| format!("create unpack root {:?}", unpack_root))?; - for input in group { - let list_path = list_path_for(&input, &list_dir); - match self.stage_dir.as_deref() { - Some(stage_dir) if crate::stage::is_read_only(&input.annotations) => { - crate::stage::stage_and_link( - input.artifact.content.as_ref(), - stage_dir, - &input.source_addr.format(), - unpack_root.as_path(), - list_path.as_deref(), - &input.filters, - crate::stage::is_per_file(&input.annotations), - ctoken, - ) - .await - .with_context(|| { - format!( - "stage read-only input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; - } - _ => { - let filters = input.filters.clone(); - let predicate: Option<&dyn Fn(&Path) -> bool> = if filters.is_empty() { - None - } else { - Some(&|rel: &Path| filters.iter().any(|f| Path::new(f) == rel)) - }; - hartifactcontent::unpack::unpack( - input.artifact.content.as_ref(), - unpack_root.as_path(), - list_path.as_deref(), - predicate, - ) - .with_context(|| { - format!( - "unpack input origin_id={} source_addr={} into {:?}", - input.origin_id, - input.source_addr.format(), - unpack_root, - ) - })?; + let mut groups: BTreeMap> = BTreeMap::new(); + for input in inputs_taken { + let unpack_root = resolve_unpack_root(&input, &sandbox_dir, &ws_dir); + groups.entry(unpack_root).or_default().push(input); + } + + let mut inputs: Vec = Vec::new(); + for (unpack_root, group) in groups { + fs::create_dir_all(&unpack_root) + .with_context(|| format!("create unpack root {:?}", unpack_root))?; + for input in group { + let list_path = list_path_for(&input, &list_dir); + match stage_dir.as_deref() { + Some(stage_dir) + if crate::stage::is_read_only(&input.annotations) => + { + crate::stage::stage_and_link( + input.artifact.content.as_ref(), + stage_dir, + &input.source_addr.format(), + unpack_root.as_path(), + list_path.as_deref(), + &input.filters, + crate::stage::is_per_file(&input.annotations), + ctoken, + ) + .await + .with_context(|| { + format!( + "stage read-only input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + } + _ => { + let filters = input.filters.clone(); + let predicate: Option<&dyn Fn(&Path) -> bool> = + if filters.is_empty() { + None + } else { + Some(&|rel: &Path| { + filters.iter().any(|f| Path::new(f) == rel) + }) + }; + hartifactcontent::unpack::unpack( + input.artifact.content.as_ref(), + unpack_root.as_path(), + list_path.as_deref(), + predicate, + ) + .with_context(|| { + format!( + "unpack input origin_id={} source_addr={} into {:?}", + input.origin_id, + input.source_addr.format(), + unpack_root, + ) + })?; + } + } + inputs.push(ManagedRunInput { + input, + list_path, + unpack_root: unpack_root.clone(), + }); } } - inputs.push(ManagedRunInput { - input, - list_path, - unpack_root: unpack_root.clone(), - }); - } - } - let sandbox_pkg_dir = ws_dir.join(req.target.addr.package.as_str()); - fs::create_dir_all(&sandbox_pkg_dir) - .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + let sandbox_pkg_dir = ws_dir.join(&pkg); + fs::create_dir_all(&sandbox_pkg_dir) + .with_context(|| format!("create pkg dir: {:?}", sandbox_pkg_dir))?; + + write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; - write_source_map(&inputs, &ws_dir, &sandbox_pkg_dir)?; + Ok((inputs, ws_dir, sandbox_pkg_dir)) + }, + ) + .await?; let target = req.target; let hashin = req.hashin; diff --git a/crates/engine/src/engine/execute.rs b/crates/engine/src/engine/execute.rs index a0b39cce..a8635e59 100644 --- a/crates/engine/src/engine/execute.rs +++ b/crates/engine/src/engine/execute.rs @@ -121,6 +121,7 @@ impl Engine { stdout, stderr, sandbox_dir, + events: rs.events_sender(), }; let res = if shell { driver.driver.run_shell(req, rs.ctoken()).await? diff --git a/crates/plugin-exec/src/pluginexec/mod.rs b/crates/plugin-exec/src/pluginexec/mod.rs index 810dbd1b..2a26db82 100644 --- a/crates/plugin-exec/src/pluginexec/mod.rs +++ b/crates/plugin-exec/src/pluginexec/mod.rs @@ -1640,6 +1640,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let _res = driver.run(make_req(req), &ctoken).await?; @@ -1692,6 +1693,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; // Use a timeout to detect the hang @@ -1750,6 +1752,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let run_fut = driver.run(make_req(req), &ctoken); @@ -1815,6 +1818,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let run_fut = driver.run(make_req(req), &ctoken); @@ -1879,6 +1883,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; let res = tokio::time::timeout( @@ -1939,6 +1944,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; Ok(String::from_utf8(stdout)?.trim().to_string()) @@ -2481,6 +2487,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2527,6 +2534,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2603,6 +2611,7 @@ mod tests { stdout: Some(&mut stdout), stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2754,6 +2763,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: sandbox.clone(), + events: None, }; os.run_inner(req, &ctoken, false).await?; @@ -2850,6 +2860,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -2921,6 +2932,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; @@ -3004,6 +3016,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -3103,6 +3116,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver .run( @@ -3189,6 +3203,7 @@ mod tests { stdout: None, stderr: None, sandbox_dir: tmp.path().to_path_buf(), + events: None, }; driver.run(make_req(req), &ctoken).await?; diff --git a/crates/plugin-sdk/src/serve.rs b/crates/plugin-sdk/src/serve.rs index fab07710..167d2ccf 100644 --- a/crates/plugin-sdk/src/serve.rs +++ b/crates/plugin-sdk/src/serve.rs @@ -870,6 +870,7 @@ async fn run_once( stdout: None, stderr: None, sandbox_dir: sandbox_dir.clone(), + events: None, }; let mrr = ManagedRunRequest { request: rr, diff --git a/crates/plugin/src/driver.rs b/crates/plugin/src/driver.rs index b6d6573d..e7754f65 100644 --- a/crates/plugin/src/driver.rs +++ b/crates/plugin/src/driver.rs @@ -870,6 +870,11 @@ pub struct RunRequest<'a, 'io> { pub stdout: Option<&'io mut (dyn tokio::io::AsyncWrite + Send + Sync + Unpin)>, pub stderr: Option<&'io mut (dyn tokio::io::AsyncWrite + Send + Sync + Unpin)>, pub sandbox_dir: std::path::PathBuf, + /// Build-progress event sink, plumbed from the engine's per-request state. + /// The driver bridge emits `SandboxCreate{Start,End}` around the sandbox + /// build step so it renders as a per-target op in the TUI. `None` when no + /// consumer is attached (e.g. tests), making the emit a no-op. + pub events: Option, } /// Cleanup closure a driver returns for the engine to run after `cache_locally`. /// The FUSE/OS sandbox layers each supply their own teardown; the engine's diff --git a/crates/tui/src/tui/progress.rs b/crates/tui/src/tui/progress.rs index c188fec3..2c286cf2 100644 --- a/crates/tui/src/tui/progress.rs +++ b/crates/tui/src/tui/progress.rs @@ -321,6 +321,7 @@ impl HeaderItem { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum Op { RemoteCacheRead, + SandboxCreate, Execute, LocalCacheWrite, RemoteCacheWrite, @@ -333,6 +334,7 @@ impl Op { fn icon(self) -> char { match self { Op::RemoteCacheRead => '↓', + Op::SandboxCreate => '⊞', Op::Execute => '▶', Op::LocalCacheWrite => '⊕', Op::RemoteCacheWrite => '↑', @@ -340,13 +342,14 @@ impl Op { } /// Pipeline ordinal for stable left-to-right ordering of the breakdown: - /// remote download → execute → local-cache write → remote upload. + /// remote download → sandbox build → execute → local-cache write → remote upload. fn order(self) -> u8 { match self { Op::RemoteCacheRead => 0, - Op::Execute => 1, - Op::LocalCacheWrite => 2, - Op::RemoteCacheWrite => 3, + Op::SandboxCreate => 1, + Op::Execute => 2, + Op::LocalCacheWrite => 3, + Op::RemoteCacheWrite => 4, } } } @@ -380,6 +383,12 @@ struct OpTimeline { /// into the shared per-target timeline; a new op needs one arm here. fn event_op_boundary(kind: &BuildEventKind) -> Option<(&str, Op, Boundary)> { match kind { + BuildEventKind::SandboxCreateStart { addr } => { + Some((addr, Op::SandboxCreate, Boundary::Start)) + } + BuildEventKind::SandboxCreateEnd { addr, .. } => { + Some((addr, Op::SandboxCreate, Boundary::End)) + } BuildEventKind::ExecuteStart { addr, .. } => Some((addr, Op::Execute, Boundary::Start)), BuildEventKind::ExecuteEnd { addr, .. } => Some((addr, Op::Execute, Boundary::End)), BuildEventKind::LocalCacheWriteStart { addr } => { @@ -561,7 +570,9 @@ impl BuildState { | BuildEventKind::RemoteCacheWriteStart { .. } | BuildEventKind::RemoteCacheWriteEnd { .. } | BuildEventKind::LocalCacheWriteStart { .. } - | BuildEventKind::LocalCacheWriteEnd { .. } => {} + | BuildEventKind::LocalCacheWriteEnd { .. } + | BuildEventKind::SandboxCreateStart { .. } + | BuildEventKind::SandboxCreateEnd { .. } => {} // GC progress is tracked by GcHeader, not the build counters. The // elapsed-clock anchor at the top of `apply` still runs, so the // clock works during a gc sweep. @@ -1656,6 +1667,58 @@ mod tests { ); } + #[test] + fn op_timeline_records_sandbox_create_before_execute() { + // SandboxCreate runs 0→4s (completed), then Execute opens at 4s and is + // still live at now=11s. The breakdown carries both, ordered with the + // sandbox build ahead of execute. + let mut s = BuildState::new(); + s.apply(&ev( + 0, + BuildEventKind::SandboxCreateStart { + addr: "//a:b".into(), + }, + )); + s.apply(&ev( + 4_000, + BuildEventKind::SandboxCreateEnd { + addr: "//a:b".into(), + error: None, + }, + )); + s.apply(&ev(4_000, execute_start("//a:b"))); + + let long = s.long_running(11_000, 5_000); + assert_eq!(long.len(), 1); + assert_eq!(long[0].0, "//a:b"); + assert_eq!(long[0].1, 7_000); // active Execute elapsed + assert_eq!( + long[0].2, + vec![(Op::SandboxCreate, 4_000), (Op::Execute, 7_000)] + ); + } + + #[test] + fn long_slow_sandbox_create_surfaces_on_its_own() { + // A sandbox build that stays open past the threshold shows as a slow row + // with the SandboxCreate icon, before any execute starts. + let mut s = BuildState::new(); + s.apply(&ev( + 0, + BuildEventKind::SandboxCreateStart { + addr: "//slow:sbx".into(), + }, + )); + let long = s.long_running(7_000, 5_000); + assert_eq!(long.len(), 1); + assert_eq!(long[0].0, "//slow:sbx"); + let line = format!("{}", s.slow_rows(7_000)[0]); + assert!( + line.contains(&format!("({} ", Op::SandboxCreate.icon())), + "{line}" + ); + } + #[test] fn op_timeline_omits_sub_second_ops() { // A 500ms Execute is below the 1s breakdown floor and is dropped; only the