diff --git a/CHANGELOG.md b/CHANGELOG.md index e0c418b6c..8798cead3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [main] Fixed `--volume-ctrl fixed` not disabling volume control - [core] Fix default permissions on credentials file and warn user if file is world readable - [core] Try all resolved addresses for the dealer connection instead of failing after the first one. +- [core] Fix dealer websocket reconnect leaving spirc hung on stale subscription channels. ## [0.8.0] - 2025-11-10 diff --git a/connect/src/model.rs b/connect/src/model.rs index 10f25f1bf..a6d892c4d 100644 --- a/connect/src/model.rs +++ b/connect/src/model.rs @@ -1,5 +1,6 @@ use crate::{ core::dealer::protocol::SkipTo, protocol::context_player_options::ContextPlayerOptionOverrides, + state::ConnectState, }; use std::ops::Deref; @@ -165,3 +166,11 @@ pub(super) enum SpircPlayStatus { preloading_of_next_track_triggered: bool, }, } + +/// Playback state saved across session reconnects so the new SpircTask +/// can resume where the old one left off. +pub struct SavedPlaybackState { + pub(super) connect_state: ConnectState, + pub(super) play_status: SpircPlayStatus, + pub(super) play_request_id: Option, +} diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index fec6057c4..497ab2eec 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -11,7 +11,7 @@ use crate::{ session::UserAttributes, spclient::TransferRequest, }, - model::{LoadRequest, PlayingTrack, SpircPlayStatus}, + model::{LoadRequest, PlayingTrack, SavedPlaybackState, SpircPlayStatus}, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel, QueueTrack}, @@ -37,6 +37,7 @@ use librespot_protocol::context_page::ContextPage; use protobuf::MessageField; use std::{ future::Future, + mem, sync::Arc, sync::atomic::{AtomicUsize, Ordering}, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -163,7 +164,23 @@ impl Spirc { credentials: Credentials, player: Arc, mixer: Arc, - ) -> Result<(Spirc, impl Future), Error> { + ) -> Result<(Spirc, impl Future>), Error> { + Self::with_saved_state(config, session, credentials, player, mixer, None).await + } + + /// Like [`Spirc::new`], but restores playback state from a previous session. + /// + /// When `saved_state` is provided, the new SpircTask picks up where the + /// old one left off — same track, position, and connect state — so the + /// Player can continue without interruption after a session reconnect. + pub async fn with_saved_state( + config: ConnectConfig, + session: Session, + credentials: Credentials, + player: Arc, + mixer: Arc, + saved_state: Option, + ) -> Result<(Spirc, impl Future>), Error> { fn extract_connection_id(msg: Message) -> Result { let connection_id = msg .headers @@ -176,7 +193,21 @@ impl Spirc { debug!("new Spirc[{spirc_id}]"); let emit_set_queue_events = config.emit_set_queue_events; - let connect_state = ConnectState::new(config, &session); + + let (connect_state, play_status, play_request_id) = match saved_state { + Some(saved) => { + info!("Spirc[{spirc_id}] restoring saved playback state"); + let mut cs = saved.connect_state; + // Update to the new session's ID so Spotify sees us as the same device. + cs.set_session_id(session.session_id()); + (cs, saved.play_status, saved.play_request_id) + } + None => ( + ConnectState::new(config, &session), + SpircPlayStatus::Stopped, + None, + ), + }; let connection_id_update = session .dealer() @@ -235,8 +266,8 @@ impl Spirc { connect_state, connect_established: false, - play_request_id: None, - play_status: SpircPlayStatus::Stopped, + play_request_id, + play_status, connection_id_update, connect_state_update, @@ -438,7 +469,7 @@ impl Spirc { } impl SpircTask { - async fn run(mut self) { + async fn run(mut self) -> Option { // simplify unwrapping of received item or parsed result macro_rules! unwrap { ( $next:expr, |$some:ident| $use_some:expr ) => { @@ -458,9 +489,12 @@ impl SpircTask { }; } + // Subscribe before start() so we can't miss a reconnect notification. + let mut reconnect_rx = self.session.dealer().reconnect_receiver(); + if let Err(why) = self.session.dealer().start().await { error!("starting dealer failed: {why}"); - return; + return None; } while !self.session.is_invalid() && !self.shutdown { @@ -477,7 +511,13 @@ impl SpircTask { connection_id_update, match |connection_id| if let Err(why) = self.handle_connection_id_update(connection_id).await { error!("failed handling connection id update: {why}"); - break; + if !self.connect_established { + // Initial registration failed — can't process + // commands without it, so restart spirc. + break; + } + // Re-registration after dealer reconnect failed — + // stay alive, next connection_id push will retry. } }, // main dealer update of any remote device updates @@ -585,10 +625,33 @@ impl SpircTask { } } }, + // Dealer reconnected after a connection loss. Our subscription + // streams survive because they're registered on the shared + // DealerShared — the new websocket dispatches through the same + // handlers. A new connection_id will arrive via + // connection_id_update and re-register our device state. + Ok(()) = reconnect_rx.changed() => { + info!("Dealer reconnected; awaiting new connection_id."); + }, else => break } } + if self.session.is_invalid() && !self.shutdown { + // Session TCP connection died unexpectedly — skip server + // communication that would fail anyway. The Player continues + // playing from its buffer; main.rs will create a new session. + warn!( + "session lost, saving playback state for recovery: {:?}", + self.play_status + ); + return Some(SavedPlaybackState { + connect_state: mem::take(&mut self.connect_state), + play_status: mem::replace(&mut self.play_status, SpircPlayStatus::Stopped), + play_request_id: self.play_request_id.take(), + }); + } + if !self.shutdown && self.connect_state.is_active() { warn!("unexpected shutdown"); if let Err(why) = self.handle_disconnect().await { @@ -602,6 +665,7 @@ impl SpircTask { }; self.session.dealer().close().await; + None } fn handle_next_context(&mut self, next_context: Result) -> bool { @@ -889,6 +953,22 @@ impl SpircTask { trace!("Received connection ID update: {connection_id:?}"); self.session.set_connection_id(&connection_id); + // If we have active playback (e.g. restored from saved state), + // update the position before registering so Spotify sees the + // correct track position. + if !matches!(self.play_status, SpircPlayStatus::Stopped) { + info!( + "re-registering with active playback state: {:?}", + self.play_status + ); + self.connect_state.set_status(&self.play_status); + if self.connect_state.is_playing() { + self.connect_state + .update_position_in_relation(self.now_ms()); + } + self.connect_state.set_now(self.now_ms() as u64); + } + let cluster = match self .connect_state .notify_new_device_appeared(&self.session) diff --git a/core/src/dealer/manager.rs b/core/src/dealer/manager.rs index 98ea0265f..06b5ba54b 100644 --- a/core/src/dealer/manager.rs +++ b/core/src/dealer/manager.rs @@ -2,7 +2,7 @@ use futures_core::Stream; use futures_util::StreamExt; use std::{pin::Pin, str::FromStr, sync::OnceLock}; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; use url::Url; @@ -16,6 +16,7 @@ component! { DealerManager: DealerManagerInner { builder: OnceLock = OnceLock::from(Builder::new()), dealer: OnceLock = OnceLock::new(), + reconnect_tx: watch::Sender = watch::Sender::new(0), } } @@ -153,10 +154,12 @@ impl DealerManager { // and the token is expired we will just get 401 error let get_url = move || Self::get_url(session.clone()); + let reconnect_tx = self.lock(|inner| inner.reconnect_tx.clone()); + let dealer = self .lock(move |inner| inner.builder.take()) .ok_or(DealerError::BuilderNotAvailable)? - .launch(get_url, None) + .launch(get_url, None, reconnect_tx) .await .map_err(DealerError::LaunchFailure)?; @@ -171,4 +174,8 @@ impl DealerManager { dealer.close().await } } + + pub fn reconnect_receiver(&self) -> watch::Receiver { + self.lock(|inner| inner.reconnect_tx.subscribe()) + } } diff --git a/core/src/dealer/mod.rs b/core/src/dealer/mod.rs index 63ee6e72c..5f28bebb2 100644 --- a/core/src/dealer/mod.rs +++ b/core/src/dealer/mod.rs @@ -21,6 +21,7 @@ use tokio::{ sync::{ Semaphore, mpsc::{self, UnboundedReceiver}, + watch, }, task::JoinHandle, }; @@ -55,6 +56,7 @@ const PING_INTERVAL: Duration = Duration::from_secs(30); const PING_TIMEOUT: Duration = Duration::from_secs(3); const RECONNECT_INTERVAL: Duration = Duration::from_secs(10); +const RECONNECT_URL_TIMEOUT: Duration = Duration::from_secs(30); const DEALER_REQUEST_HANDLERS_POISON_MSG: &str = "dealer request handlers mutex should not be poisoned"; @@ -261,7 +263,7 @@ struct Builder { } macro_rules! create_dealer { - ($builder:expr, $shared:ident -> $body:expr) => { + ($builder:expr, $reconnect_tx:expr, $shared:ident -> $body:expr) => { match $builder { builder => { let shared = Arc::new(DealerShared { @@ -270,6 +272,8 @@ macro_rules! create_dealer { notify_drop: Semaphore::new(0), }); + let reconnect_tx: watch::Sender = $reconnect_tx; + let handle = { let $shared = Arc::clone(&shared); tokio::spawn($body) @@ -278,6 +282,7 @@ macro_rules! create_dealer { Dealer { shared, handle: TimeoutOnDrop::new(handle, WEBSOCKET_CLOSE_TIMEOUT), + reconnect_tx, } } } @@ -301,26 +306,38 @@ impl Builder { handles(&self.request_handlers, &self.message_handlers, uri) } - pub fn launch_in_background(self, get_url: F, proxy: Option) -> Dealer + pub fn launch_in_background( + self, + get_url: F, + proxy: Option, + reconnect_tx: watch::Sender, + ) -> Dealer where Fut: Future + Send + 'static, F: (Fn() -> Fut) + Send + 'static, { - create_dealer!(self, shared -> run(shared, None, get_url, proxy)) + let tx = reconnect_tx.clone(); + create_dealer!(self, reconnect_tx, shared -> run(shared, None, get_url, proxy, tx)) } - pub async fn launch(self, get_url: F, proxy: Option) -> WsResult + pub async fn launch( + self, + get_url: F, + proxy: Option, + reconnect_tx: watch::Sender, + ) -> WsResult where Fut: Future + Send + 'static, F: (Fn() -> Fut) + Send + 'static, { - let dealer = create_dealer!(self, shared -> { + let tx = reconnect_tx.clone(); + let dealer = create_dealer!(self, reconnect_tx, shared -> { // Try to connect. let url = get_url().await?; let tasks = connect(&url, proxy.as_ref(), &shared).await?; // If a connection is established, continue in a background task. - run(shared, Some(tasks), get_url, proxy) + run(shared, Some(tasks), get_url, proxy, tx) }); Ok(dealer) @@ -426,6 +443,7 @@ impl DealerShared { struct Dealer { shared: Arc, handle: TimeoutOnDrop>, + reconnect_tx: watch::Sender, } impl Dealer { @@ -482,6 +500,10 @@ impl Dealer { ) } + pub fn reconnect_receiver(&self) -> watch::Receiver { + self.reconnect_tx.subscribe() + } + pub async fn close(mut self) { debug!("closing dealer"); @@ -665,6 +687,7 @@ async fn run( initial_tasks: Option<(JoinHandle<()>, JoinHandle<()>)>, mut get_url: F, proxy: Option, + reconnect_tx: watch::Sender, ) -> Result<(), Error> where Fut: Future + Send + 'static, @@ -672,12 +695,16 @@ where { let init_task = |t| Some(TimeoutOnDrop::new(t, WEBSOCKET_CLOSE_TIMEOUT)); + let has_had_initial_connection = initial_tasks.is_some(); + let mut tasks = if let Some((s, r)) = initial_tasks { (init_task(s), init_task(r)) } else { (None, None) }; + let mut has_connected = has_had_initial_connection; + while !shared.is_closed() { match &mut tasks { (Some(t0), Some(t1)) => { @@ -702,11 +729,38 @@ where () = shared.closed() => { break }, - e = get_url() => e - }?; + result = tokio::time::timeout(RECONNECT_URL_TIMEOUT, get_url()) => { + match result { + Ok(Ok(url)) => url, + Ok(Err(e)) => { + error!("Failed to resolve dealer URL: {e}"); + if has_connected { + reconnect_tx.send_modify(|n| *n += 1); + } + tokio::time::sleep(RECONNECT_INTERVAL).await; + continue; + } + Err(_) => { + error!("Timed out resolving dealer URL."); + if has_connected { + reconnect_tx.send_modify(|n| *n += 1); + } + tokio::time::sleep(RECONNECT_INTERVAL).await; + continue; + } + } + } + }; match connect(&url, proxy.as_ref(), &shared).await { - Ok((s, r)) => tasks = (init_task(s), init_task(r)), + Ok((s, r)) => { + tasks = (init_task(s), init_task(r)); + if has_connected { + warn!("Dealer reconnected; notifying consumers."); + reconnect_tx.send_modify(|n| *n += 1); + } + has_connected = true; + } Err(e) => { error!("Error while connecting: {e}"); tokio::time::sleep(RECONNECT_INTERVAL).await; diff --git a/examples/play_connect.rs b/examples/play_connect.rs index 1be6345ba..044865cdb 100644 --- a/examples/play_connect.rs +++ b/examples/play_connect.rs @@ -71,7 +71,7 @@ async fn main() -> Result<(), Error> { spirc.play()?; // starting the connect device and processing the previously "queued" calls - spirc_task.await; + let _ = spirc_task.await; Ok(()) } diff --git a/src/main.rs b/src/main.rs index 16ba1946f..c3869ae9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1899,6 +1899,7 @@ async fn main() { let mut discovery = None; let mut connecting = false; let mut _event_handler: Option = None; + let mut saved_playback_state = None; let mut session = Session::new(setup.session_config.clone(), setup.cache.clone()); @@ -2020,6 +2021,10 @@ async fn main() { last_credentials = Some(credentials.clone()); auto_connect_times.clear(); + // New account via Discovery — discard any saved + // playback state from the previous account. + saved_playback_state = None; + if let Some(spirc) = spirc.take() { if let Err(e) = spirc.shutdown() { error!("error sending spirc shutdown message: {e}"); @@ -2049,11 +2054,14 @@ async fn main() { let connect_config = setup.connect_config.clone(); - let (spirc_, spirc_task_) = match Spirc::new(connect_config, - session.clone(), - last_credentials.clone().unwrap_or_default(), - player.clone(), - mixer.clone()).await { + let (spirc_, spirc_task_) = match Spirc::with_saved_state( + connect_config, + session.clone(), + last_credentials.clone().unwrap_or_default(), + player.clone(), + mixer.clone(), + saved_playback_state.take(), + ).await { Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_), Err(e) => { error!("could not initialize spirc: {e}"); @@ -2065,14 +2073,20 @@ async fn main() { connecting = false; }, - _ = async { - if let Some(task) = spirc_task.as_mut() { - task.await; + saved = async { + match spirc_task.as_mut() { + Some(task) => task.await, + None => None, } }, if spirc_task.is_some() && !connecting => { spirc_task = None; + saved_playback_state = saved; - warn!("Spirc shut down unexpectedly"); + if saved_playback_state.is_some() { + info!("Spirc shut down with saved playback state, reconnecting"); + } else { + warn!("Spirc shut down unexpectedly"); + } let mut reconnect_exceeds_rate_limit = || { auto_connect_times.retain(|&t| t.elapsed() < RECONNECT_RATE_LIMIT_WINDOW); @@ -2112,7 +2126,9 @@ async fn main() { } if let Some(spirc_task) = spirc_task { - shutdown_tasks.spawn(spirc_task); + shutdown_tasks.spawn(async { + spirc_task.await; + }); } }