Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/tui/backend/ci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ pub async fn run<A: App>(
}
} => {
match maybe_evt {
Some(ev) => view.apply(&ev),
// Greedily drain the rest of the buffered backlog in this
// same wake so an emitted burst folds in one pass instead of
// one event per `select!` iteration.
Some(ev) => {
view.apply(&ev);
if let Some(r) = events.as_mut() {
super::fold_buffered(r, &mut view, |v, e| v.apply(e));
}
}
// Sender dropped — stop polling, keep awaiting the app.
None => events = None,
}
Expand All @@ -50,9 +58,7 @@ pub async fn run<A: App>(
// Drain any events buffered before the sender dropped so the final
// summary is accurate even if the app future completed first.
if let Some(r) = events.as_mut() {
while let Ok(ev) = r.try_recv() {
view.apply(&ev);
}
super::fold_buffered(r, &mut view, |v, ev| v.apply(ev));
}

view.finish();
Expand Down
17 changes: 12 additions & 5 deletions src/tui/backend/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,17 @@ pub async fn run<A: App + 'static>(
}, if !paused => {
match maybe_build_evt {
// No per-event redraw: fold into the view here, the 80ms
// ticker repaints the pinned progress block.
Some(e) => view.apply(&e),
// ticker repaints the pinned progress block. Greedily drain
// the rest of the buffered backlog in this same wake so an
// emitted burst folds in one pass rather than one event per
// `select!` iteration (which left the fold seconds behind a
// warm run's flood of typed events).
Some(e) => {
view.apply(&e);
if let Some(r) = build_events.as_mut() {
super::fold_buffered(r, &mut view, |v, ev| v.apply(ev));
}
}
// Sender dropped (request finished) — stop polling.
None => build_events = None,
}
Expand Down Expand Up @@ -341,9 +350,7 @@ pub async fn run<A: App + 'static>(
// request state drops). Drain them so the final summary reflects the full
// stream rather than the snapshot at the last tick.
if let Some(r) = build_events.as_mut() {
while let Ok(e) = r.try_recv() {
view.apply(&e);
}
super::fold_buffered(r, &mut view, |v, e| v.apply(e));
}

// Persistent final summary, printed straight to stderr below the
Expand Down
71 changes: 71 additions & 0 deletions src/tui/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,73 @@
pub mod ci;
pub mod interactive;

use tokio::sync::mpsc::UnboundedReceiver;

use crate::engine::event::BuildEvent;

/// Fold every event currently buffered in `rx` into `view`, returning how many
/// were folded. Non-blocking: `try_recv` stops at the first empty (or closed)
/// read, so this drains the present backlog without ever waiting for more.
///
/// The backends call this when the build-event select arm wakes, so an emitted
/// burst (a warm run resolves its whole transitive closure near-instantly,
/// emitting tens of thousands of typed events) folds in one pass between two
/// render ticks instead of one event per `select!` iteration.
pub(crate) fn fold_buffered<V: ?Sized>(
rx: &mut UnboundedReceiver<BuildEvent>,
view: &mut V,
apply: impl Fn(&mut V, &BuildEvent),
) -> usize {
let mut folded = 0;
while let Ok(ev) = rx.try_recv() {
apply(view, &ev);
folded += 1;
}
folded
}

#[cfg(test)]
mod tests {
use super::fold_buffered;
use crate::engine::event::{BuildEvent, BuildEventKind};
use tokio::sync::mpsc;

fn result_end(addr: &str) -> BuildEvent {
BuildEvent {
at_unix_ms: 0,
kind: BuildEventKind::ResultEnd {
addr: addr.to_string(),
error: None,
},
}
}

/// One wake folds the entire buffered backlog and leaves the channel empty —
/// the contract that replaces the one-event-per-`select!`-iteration drain.
#[tokio::test]
async fn drains_whole_backlog_in_one_pass() {
let (tx, mut rx) = mpsc::unbounded_channel();
const K: usize = 5_000;
for i in 0..K {
tx.send(result_end(&format!("//pkg:t{i}"))).expect("send");
}

// Stand in for a view: count folds. The drain mechanics are what's under
// test, not a specific aggregator.
let mut count = 0usize;
let folded = fold_buffered(&mut rx, &mut count, |c, _ev| *c += 1);

assert_eq!(folded, K, "should fold every buffered event in one call");
assert_eq!(count, K, "view sees every event");
assert!(rx.try_recv().is_err(), "channel drained empty");
}

/// An empty channel folds nothing (and does not block).
#[tokio::test]
async fn empty_channel_folds_nothing() {
let (_tx, mut rx) = mpsc::unbounded_channel();
let mut count = 0usize;
assert_eq!(fold_buffered(&mut rx, &mut count, |c, _e| *c += 1), 0);
assert_eq!(count, 0);
}
}
Loading