From eb8dbaff5ce2cea4d52d06469a6522ca096187fb Mon Sep 17 00:00:00 2001 From: jmoseley Date: Tue, 26 May 2026 20:43:48 -0700 Subject: [PATCH] Track live open canvas snapshots Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- rust/src/session.rs | 36 ++++++++++++-- rust/tests/session_test.rs | 97 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/rust/src/session.rs b/rust/src/session.rs index 57181459c..c7d3bcf2d 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -842,6 +842,7 @@ impl Client { let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default())); let channels = self.register_session(&session_id); let idle_waiter = Arc::new(ParkingLotMutex::new(None)); + let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new())); let shutdown = CancellationToken::new(); let (event_tx, _) = tokio::sync::broadcast::channel(512); let event_loop = spawn_event_loop( @@ -856,6 +857,7 @@ impl Client { channels, idle_waiter.clone(), capabilities.clone(), + open_canvases.clone(), event_tx.clone(), shutdown.clone(), ); @@ -914,7 +916,7 @@ impl Client { shutdown, idle_waiter, capabilities, - open_canvases: Arc::new(parking_lot::RwLock::new(Vec::new())), + open_canvases, event_tx, }) } @@ -982,6 +984,7 @@ impl Client { let setup_start = Instant::now(); let channels = self.register_session(&session_id); let idle_waiter = Arc::new(ParkingLotMutex::new(None)); + let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new())); let shutdown = CancellationToken::new(); let (event_tx, _) = tokio::sync::broadcast::channel(512); let event_loop = spawn_event_loop( @@ -996,6 +999,7 @@ impl Client { channels, idle_waiter.clone(), capabilities.clone(), + open_canvases.clone(), event_tx.clone(), shutdown.clone(), ); @@ -1067,9 +1071,7 @@ impl Client { } *capabilities.write() = resume_result.capabilities.unwrap_or_default(); - let open_canvases = Arc::new(parking_lot::RwLock::new( - resume_result.open_canvases.unwrap_or_default(), - )); + *open_canvases.write() = resume_result.open_canvases.unwrap_or_default(); tracing::debug!( elapsed_ms = total_start.elapsed().as_millis(), @@ -1107,6 +1109,20 @@ fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc, + snapshot: OpenCanvasInstance, +) { + if let Some(existing) = snapshots + .iter_mut() + .find(|open| open.instance_id == snapshot.instance_id) + { + *existing = snapshot; + } else { + snapshots.push(snapshot); + } +} + #[allow(clippy::too_many_arguments)] fn spawn_event_loop( session_id: SessionId, @@ -1120,6 +1136,7 @@ fn spawn_event_loop( channels: crate::router::SessionChannels, idle_waiter: Arc>>, capabilities: Arc>, + open_canvases: Arc>>, event_tx: tokio::sync::broadcast::Sender, shutdown: CancellationToken, ) -> JoinHandle<()> { @@ -1146,7 +1163,7 @@ fn spawn_event_loop( _ = shutdown.cancelled() => break, Some(notification) = notifications.recv() => { handle_notification( - &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx, + &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx, ).await; } Some(request) = requests.recv() => { @@ -1217,6 +1234,7 @@ async fn handle_notification( notification: SessionEventNotification, idle_waiter: &Arc>>, capabilities: &Arc>, + open_canvases: &Arc>>, event_tx: &tokio::sync::broadcast::Sender, ) { let dispatch_start = Instant::now(); @@ -1298,6 +1316,14 @@ async fn handle_notification( Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"), } } + if event_type == SessionEventType::SessionCanvasOpened { + match serde_json::from_value::(notification.event.data.clone()) { + Ok(open_canvas) => { + upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas); + } + Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"), + } + } tracing::debug!( elapsed_ms = dispatch_start.elapsed().as_millis(), diff --git a/rust/tests/session_test.rs b/rust/tests/session_test.rs index bb4e602e0..e3e435769 100644 --- a/rust/tests/session_test.rs +++ b/rust/tests/session_test.rs @@ -2583,6 +2583,103 @@ async fn resume_session_sends_canvas_fields_and_captures_open_canvases() { assert_eq!(caps.ui.unwrap().canvases, Some(true)); } +#[tokio::test] +async fn session_canvas_opened_updates_open_canvas_snapshots() { + let (session, mut server) = create_session_pair().await; + assert!(session.open_canvases().is_empty()); + + server + .send_event( + "session.canvas.opened", + serde_json::json!({ + "instanceId": "missing-required-fields", + }), + ) + .await; + server + .send_event( + "session.canvas.opened", + serde_json::json!({ + "extensionId": "project:counter", + "extensionName": "Counter Provider", + "canvasId": "counter", + "instanceId": "counter-1", + "title": "Counter", + "status": "ready", + "url": "https://example.test/counter", + "input": { "seed": 1 }, + "reopen": false, + "availability": "ready" + }), + ) + .await; + server + .send_event( + "session.canvas.opened", + serde_json::json!({ + "extensionId": "project:logs", + "canvasId": "logs", + "instanceId": "logs-1", + "title": "Logs", + "reopen": false, + "availability": "stale" + }), + ) + .await; + + let mut open = Vec::new(); + for _ in 0..50 { + open = session.open_canvases(); + if open.len() == 2 { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert_eq!(open.len(), 2); + assert_eq!(open[0].instance_id, "counter-1"); + assert_eq!(open[0].title.as_deref(), Some("Counter")); + assert_eq!(open[0].availability, CanvasInstanceAvailability::Ready); + assert_eq!(open[1].instance_id, "logs-1"); + + server + .send_event( + "session.canvas.opened", + serde_json::json!({ + "extensionId": "project:counter", + "extensionName": "Counter Provider", + "canvasId": "counter", + "instanceId": "counter-1", + "title": "Counter Updated", + "status": "reconnected", + "url": "https://example.test/counter-updated", + "input": { "seed": 2 }, + "reopen": true, + "availability": "stale" + }), + ) + .await; + + for _ in 0..50 { + open = session.open_canvases(); + if open.len() == 2 && open[0].title.as_deref() == Some("Counter Updated") { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert_eq!(open.len(), 2); + assert_eq!(open[0].instance_id, "counter-1"); + assert_eq!(open[0].title.as_deref(), Some("Counter Updated")); + assert_eq!(open[0].status.as_deref(), Some("reconnected")); + assert_eq!( + open[0].url.as_deref(), + Some("https://example.test/counter-updated") + ); + assert_eq!(open[0].input, Some(serde_json::json!({ "seed": 2 }))); + assert!(open[0].reopen); + assert_eq!(open[0].availability, CanvasInstanceAvailability::Stale); + assert_eq!(open[1].instance_id, "logs-1"); +} + #[tokio::test] async fn elicitation_methods_fail_without_capability() { let (session, _server) = create_session_pair().await;