Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/builtins/src/pluginfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ mod tests {
stdout: None,
stderr: None,
sandbox_dir: root,
events: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/builtins/src/pluginhostbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ mod tests {
stdout: None,
stderr: None,
sandbox_dir: sandbox_dir.to_path_buf(),
events: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/builtins/src/plugintextfile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ mod tests {
stdout: None,
stderr: None,
sandbox_dir: sandbox_dir.to_path_buf(),
events: None,
}
}

Expand Down
172 changes: 172 additions & 0 deletions crates/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ pub enum BuildEventKind {
addr: String,
error: Option<String>,
},
/// Start of building the target's sandbox: creating the sandbox dir and
/// materializing every declared input into it (unpack / hardlink-stage /
/// FUSE-slot register) before the subprocess spawns. Emitted by the driver
/// bridge, which owns the create step. Surfaced as one op in the per-target
/// timeline and marked slow if it runs long (large inputs, cold stage).
/// Paired with `SandboxCreateEnd` (fires on completion, `?`, or cancel).
SandboxCreateStart {
addr: String,
},
SandboxCreateEnd {
addr: String,
error: Option<String>,
},
LocalCacheHit {
addr: String,
},
Expand Down Expand Up @@ -131,3 +144,162 @@ pub fn now_unix_ms() -> u64 {
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}

/// Stamp + send one event on `tx`. A closed receiver (consumer gone, e.g. TUI
/// shut down) is expected — events are best-effort, so the send result is
/// dropped intentionally.
fn send_stamped(tx: &EventSender, kind: BuildEventKind) {
drop(tx.send(BuildEvent {
at_unix_ms: now_unix_ms(),
kind,
}));
}

/// Drop-guard so the `*End` event fires on early-return (`?`) **and** on
/// cancellation (the awaited future is dropped mid-flight). Once armed, it emits
/// exactly one end event when dropped.
struct EndGuard {
tx: Option<EventSender>,
make_end: Option<Box<dyn FnOnce(Option<String>) -> BuildEventKind + Send>>,
error: Option<String>,
}

impl Drop for EndGuard {
fn drop(&mut self) {
if let (Some(tx), Some(make_end)) = (self.tx.take(), self.make_end.take()) {
send_stamped(&tx, make_end(self.error.take()));
}
}
}

/// Emit `start`, run `fut`, then emit `make_end(error)` on completion,
/// early-return (`?`), or cancellation — given a raw `EventSender` rather than
/// the engine's `RequestState`. Used by layers below the engine (the driver
/// bridge) that hold an `Option<EventSender>` plumbed through the request. A
/// `None` sender makes this a transparent pass-through (no events, no overhead).
///
/// The end event is produced by an internal drop-guard armed before the await,
/// so it fires even if `fut` is cancelled mid-await. `at_unix_ms` is stamped on
/// both the start and end events.
pub async fn emit_scope_tx<T>(
events: Option<EventSender>,
start: BuildEventKind,
make_end: impl FnOnce(Option<String>) -> BuildEventKind + Send + 'static,
fut: impl std::future::Future<Output = anyhow::Result<T>>,
) -> anyhow::Result<T> {
if let Some(tx) = &events {
send_stamped(tx, start);
}
let mut guard = EndGuard {
tx: events,
make_end: Some(Box::new(make_end)),
error: None,
};
let out = fut.await; // guard still armed if this is cancelled mid-await
if let Err(e) = &out {
guard.error = Some(format!("{e:#}"));
}
out // guard drops here → emits *End
}

#[cfg(test)]
mod tests {
use super::*;

fn kinds(rx: &mut EventReceiver) -> Vec<BuildEventKind> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
out.push(ev.kind);
}
out
}

#[tokio::test]
async fn emit_scope_tx_emits_start_then_end_on_success() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let out: anyhow::Result<u32> = emit_scope_tx(
Some(tx),
BuildEventKind::SandboxCreateStart {
addr: "//a:b".into(),
},
|error| BuildEventKind::SandboxCreateEnd {
addr: "//a:b".into(),
error,
},
async { Ok(7) },
)
.await;
assert_eq!(out.unwrap(), 7);
match kinds(&mut rx).as_slice() {
[
BuildEventKind::SandboxCreateStart { addr: a },
BuildEventKind::SandboxCreateEnd {
addr: b,
error: None,
},
] => {
assert_eq!(a, "//a:b");
assert_eq!(b, "//a:b");
}
other => panic!("unexpected events: {other:?}"),
}
}

#[tokio::test]
async fn emit_scope_tx_captures_error_in_end_event() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let out: anyhow::Result<()> = emit_scope_tx(
Some(tx),
BuildEventKind::SandboxCreateStart {
addr: "//a:b".into(),
},
|error| BuildEventKind::SandboxCreateEnd {
addr: "//a:b".into(),
error,
},
async { Err(anyhow::anyhow!("boom")) },
)
.await;
assert!(out.is_err());
let end = kinds(&mut rx).pop().expect("end event");
match end {
BuildEventKind::SandboxCreateEnd {
error: Some(msg), ..
} => assert!(msg.contains("boom"), "{msg}"),
other => panic!("expected end-with-error, got {other:?}"),
}
}

#[tokio::test]
async fn emit_scope_tx_emits_end_on_cancellation() {
// Dropping the future mid-await must still fire the end event via the
// armed drop-guard — the slow-sandbox case where the build is cancelled.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let fut = emit_scope_tx(
Some(tx),
BuildEventKind::SandboxCreateStart {
addr: "//a:b".into(),
},
|error| BuildEventKind::SandboxCreateEnd {
addr: "//a:b".into(),
error,
},
std::future::pending::<anyhow::Result<()>>(),
);
// Poll once to emit Start + arm the guard, then drop without completing.
let mut boxed = Box::pin(fut);
let _ = futures::poll!(boxed.as_mut());
drop(boxed);
let observed = kinds(&mut rx);
assert!(
matches!(
observed.as_slice(),
[
BuildEventKind::SandboxCreateStart { .. },
BuildEventKind::SandboxCreateEnd { error: None, .. },
]
),
"expected start+end on cancel, got {observed:?}"
);
}
}
112 changes: 112 additions & 0 deletions crates/driver-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,117 @@ mod shell_fallback_tests {
}
}

struct RunOkDriver;

#[async_trait]
impl ManagedDriver for RunOkDriver {
fn config(&self, _: ConfigRequest) -> anyhow::Result<ConfigResponse> {
Ok(ConfigResponse {
name: "runok".to_string(),
})
}
fn schema(&self) -> hplugin::driver::DriverSchema {
hplugin::driver::DriverSchema::default()
}
async fn parse(
&self,
_: ParseRequest,
_: &(dyn Cancellable + Send + Sync),
) -> anyhow::Result<ParseResponse> {
unimplemented!()
}
async fn apply_transitive(
&self,
req: ApplyTransitiveRequest,
_: &(dyn Cancellable + Send + Sync),
) -> anyhow::Result<ApplyTransitiveResponse> {
Ok(ApplyTransitiveResponse {
target_def: req.target_def,
})
}
async fn run<'a, 'io>(
&self,
_: ManagedRunRequest<'a, 'io>,
_: &(dyn Cancellable + Send + Sync),
) -> anyhow::Result<ManagedRunResponse> {
Ok(ManagedRunResponse { artifacts: vec![] })
}
}

#[tokio::test]
async fn run_emits_sandbox_create_start_and_end() -> anyhow::Result<()> {
// With an EventSender attached, the OS bridge brackets the sandbox build
// with SandboxCreate{Start,End} so the TUI can render it as a step.
let mut config: std::collections::HashMap<String, hcore::htvalue::Value> =
std::collections::HashMap::new();
config.insert("run".to_string(), hcore::htvalue::Value::List(vec![]));
let fallback = Arc::new(ShellFallback {
driver: Arc::new(RunOkDriver),
spec_template: Arc::new(TargetSpec {
addr: Default::default(),
driver: "exec".to_string(),
config,
labels: vec![],
transitive: Default::default(),
}),
});
let bridge = ManagedDriverBridge::new_os_for_test_with_shell_fallback(
Box::new(RunOkDriver),
fallback,
);

let ctoken = StdCancellationToken::new();
let tmp = tempfile::tempdir()?;
let sandbox = tmp.path().join("sandbox");
fs::create_dir_all(&sandbox)?;

let target_def = TargetDef {
addr: Default::default(),
labels: vec![],
raw_def: Arc::new(()),
inputs: vec![],
outputs: vec![],
support_files: vec![],
cache: hplugin::driver::targetdef::CacheConfig::off(),
pty: false,
hash: vec![],
transparent: false,
};
let request_id = "sandbox-create-test".to_string();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let req = RunRequest {
request_id: &request_id,
target: &target_def,
tree_root_path: tmp.path().to_path_buf(),
inputs: vec![],
hashin: "",
stdin: None,
stdout: None,
stderr: None,
sandbox_dir: sandbox.clone(),
events: Some(tx),
};

bridge.run(req, &ctoken).await?;

let mut kinds = Vec::new();
while let Ok(ev) = rx.try_recv() {
kinds.push(ev.kind);
}
let expected = target_def.addr.format();
assert!(
matches!(
kinds.as_slice(),
[
hcore::events::BuildEventKind::SandboxCreateStart { addr: a },
hcore::events::BuildEventKind::SandboxCreateEnd { addr: b, error: None },
] if *a == expected && *b == expected
),
"expected sandbox create start+end for {expected}, got {kinds:?}",
);
Ok(())
}

#[tokio::test]
async fn run_shell_dispatches_to_fallback_when_driver_does_not_support() -> anyhow::Result<()> {
let parse_called = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -341,6 +452,7 @@ mod shell_fallback_tests {
stdout: None,
stderr: None,
sandbox_dir: sandbox.clone(),
events: None,
};

bridge.run_shell(req, &ctoken).await?;
Expand Down
Loading
Loading