Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ impl Client {
let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
let channels = self.register_session(&session_id);
let idle_waiter = Arc::new(ParkingLotMutex::new(None));
let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
let shutdown = CancellationToken::new();
let (event_tx, _) = tokio::sync::broadcast::channel(512);
let event_loop = spawn_event_loop(
Expand All @@ -856,6 +857,7 @@ impl Client {
channels,
idle_waiter.clone(),
capabilities.clone(),
open_canvases.clone(),
event_tx.clone(),
shutdown.clone(),
);
Expand Down Expand Up @@ -914,7 +916,7 @@ impl Client {
shutdown,
idle_waiter,
capabilities,
open_canvases: Arc::new(parking_lot::RwLock::new(Vec::new())),
open_canvases,
event_tx,
})
}
Expand Down Expand Up @@ -982,6 +984,7 @@ impl Client {
let setup_start = Instant::now();
let channels = self.register_session(&session_id);
let idle_waiter = Arc::new(ParkingLotMutex::new(None));
let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
let shutdown = CancellationToken::new();
let (event_tx, _) = tokio::sync::broadcast::channel(512);
let event_loop = spawn_event_loop(
Expand All @@ -996,6 +999,7 @@ impl Client {
channels,
idle_waiter.clone(),
capabilities.clone(),
open_canvases.clone(),
event_tx.clone(),
shutdown.clone(),
);
Expand Down Expand Up @@ -1067,9 +1071,7 @@ impl Client {
}

*capabilities.write() = resume_result.capabilities.unwrap_or_default();
let open_canvases = Arc::new(parking_lot::RwLock::new(
resume_result.open_canvases.unwrap_or_default(),
));
*open_canvases.write() = resume_result.open_canvases.unwrap_or_default();

Comment on lines +1074 to 1075
tracing::debug!(
elapsed_ms = total_start.elapsed().as_millis(),
Expand Down Expand Up @@ -1107,6 +1109,20 @@ fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<Comm
Arc::new(map)
}

fn upsert_open_canvas_snapshot(
snapshots: &mut Vec<OpenCanvasInstance>,
snapshot: OpenCanvasInstance,
) {
if let Some(existing) = snapshots
.iter_mut()
.find(|open| open.instance_id == snapshot.instance_id)
{
*existing = snapshot;
} else {
snapshots.push(snapshot);
}
}

#[allow(clippy::too_many_arguments)]
fn spawn_event_loop(
session_id: SessionId,
Expand All @@ -1120,6 +1136,7 @@ fn spawn_event_loop(
channels: crate::router::SessionChannels,
idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
shutdown: CancellationToken,
) -> JoinHandle<()> {
Expand All @@ -1146,7 +1163,7 @@ fn spawn_event_loop(
_ = shutdown.cancelled() => break,
Some(notification) = notifications.recv() => {
handle_notification(
&session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx,
&session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
).await;
}
Some(request) = requests.recv() => {
Expand Down Expand Up @@ -1217,6 +1234,7 @@ async fn handle_notification(
notification: SessionEventNotification,
idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
) {
let dispatch_start = Instant::now();
Expand Down Expand Up @@ -1298,6 +1316,14 @@ async fn handle_notification(
Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
}
}
if event_type == SessionEventType::SessionCanvasOpened {
match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
Ok(open_canvas) => {
upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
}
Comment on lines +1319 to +1323
Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
}
}

tracing::debug!(
elapsed_ms = dispatch_start.elapsed().as_millis(),
Expand Down
97 changes: 97 additions & 0 deletions rust/tests/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2583,6 +2583,103 @@ async fn resume_session_sends_canvas_fields_and_captures_open_canvases() {
assert_eq!(caps.ui.unwrap().canvases, Some(true));
}

#[tokio::test]
async fn session_canvas_opened_updates_open_canvas_snapshots() {
let (session, mut server) = create_session_pair().await;
assert!(session.open_canvases().is_empty());

server
.send_event(
"session.canvas.opened",
serde_json::json!({
"instanceId": "missing-required-fields",
}),
)
.await;
server
.send_event(
"session.canvas.opened",
serde_json::json!({
"extensionId": "project:counter",
"extensionName": "Counter Provider",
"canvasId": "counter",
"instanceId": "counter-1",
"title": "Counter",
"status": "ready",
"url": "https://example.test/counter",
"input": { "seed": 1 },
"reopen": false,
"availability": "ready"
}),
)
.await;
server
.send_event(
"session.canvas.opened",
serde_json::json!({
"extensionId": "project:logs",
"canvasId": "logs",
"instanceId": "logs-1",
"title": "Logs",
"reopen": false,
"availability": "stale"
}),
)
.await;

let mut open = Vec::new();
for _ in 0..50 {
open = session.open_canvases();
if open.len() == 2 {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(open.len(), 2);
assert_eq!(open[0].instance_id, "counter-1");
assert_eq!(open[0].title.as_deref(), Some("Counter"));
assert_eq!(open[0].availability, CanvasInstanceAvailability::Ready);
assert_eq!(open[1].instance_id, "logs-1");

server
.send_event(
"session.canvas.opened",
serde_json::json!({
"extensionId": "project:counter",
"extensionName": "Counter Provider",
"canvasId": "counter",
"instanceId": "counter-1",
"title": "Counter Updated",
"status": "reconnected",
"url": "https://example.test/counter-updated",
"input": { "seed": 2 },
"reopen": true,
"availability": "stale"
}),
)
.await;

for _ in 0..50 {
open = session.open_canvases();
if open.len() == 2 && open[0].title.as_deref() == Some("Counter Updated") {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(open.len(), 2);
assert_eq!(open[0].instance_id, "counter-1");
assert_eq!(open[0].title.as_deref(), Some("Counter Updated"));
assert_eq!(open[0].status.as_deref(), Some("reconnected"));
assert_eq!(
open[0].url.as_deref(),
Some("https://example.test/counter-updated")
);
assert_eq!(open[0].input, Some(serde_json::json!({ "seed": 2 })));
assert!(open[0].reopen);
assert_eq!(open[0].availability, CanvasInstanceAvailability::Stale);
assert_eq!(open[1].instance_id, "logs-1");
}

#[tokio::test]
async fn elicitation_methods_fail_without_capability() {
let (session, _server) = create_session_pair().await;
Expand Down
Loading