Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [main] Fixed `--volume-ctrl fixed` not disabling volume control
- [core] Fix default permissions on credentials file and warn user if file is world readable
- [core] Try all resolved addresses for the dealer connection instead of failing after the first one.
- [core] Fix dealer websocket reconnect leaving spirc hung on stale subscription channels.

## [0.8.0] - 2025-11-10

Expand Down
9 changes: 9 additions & 0 deletions connect/src/model.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
core::dealer::protocol::SkipTo, protocol::context_player_options::ContextPlayerOptionOverrides,
state::ConnectState,
};

use std::ops::Deref;
Expand Down Expand Up @@ -165,3 +166,11 @@ pub(super) enum SpircPlayStatus {
preloading_of_next_track_triggered: bool,
},
}

/// Playback state saved across session reconnects so the new SpircTask
/// can resume where the old one left off.
pub struct SavedPlaybackState {
pub(super) connect_state: ConnectState,
pub(super) play_status: SpircPlayStatus,
pub(super) play_request_id: Option<u64>,
}
96 changes: 88 additions & 8 deletions connect/src/spirc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
session::UserAttributes,
spclient::TransferRequest,
},
model::{LoadRequest, PlayingTrack, SpircPlayStatus},
model::{LoadRequest, PlayingTrack, SavedPlaybackState, SpircPlayStatus},
playback::{
mixer::Mixer,
player::{Player, PlayerEvent, PlayerEventChannel, QueueTrack},
Expand All @@ -37,6 +37,7 @@ use librespot_protocol::context_page::ContextPage;
use protobuf::MessageField;
use std::{
future::Future,
mem,
sync::Arc,
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -163,7 +164,23 @@ impl Spirc {
credentials: Credentials,
player: Arc<Player>,
mixer: Arc<dyn Mixer>,
) -> Result<(Spirc, impl Future<Output = ()>), Error> {
) -> Result<(Spirc, impl Future<Output = Option<SavedPlaybackState>>), Error> {
Self::with_saved_state(config, session, credentials, player, mixer, None).await
}

/// Like [`Spirc::new`], but restores playback state from a previous session.
///
/// When `saved_state` is provided, the new SpircTask picks up where the
/// old one left off — same track, position, and connect state — so the
/// Player can continue without interruption after a session reconnect.
pub async fn with_saved_state(
config: ConnectConfig,
session: Session,
credentials: Credentials,
player: Arc<Player>,
mixer: Arc<dyn Mixer>,
saved_state: Option<SavedPlaybackState>,
) -> Result<(Spirc, impl Future<Output = Option<SavedPlaybackState>>), Error> {
fn extract_connection_id(msg: Message) -> Result<String, Error> {
let connection_id = msg
.headers
Expand All @@ -176,7 +193,21 @@ impl Spirc {
debug!("new Spirc[{spirc_id}]");

let emit_set_queue_events = config.emit_set_queue_events;
let connect_state = ConnectState::new(config, &session);

let (connect_state, play_status, play_request_id) = match saved_state {
Some(saved) => {
info!("Spirc[{spirc_id}] restoring saved playback state");
let mut cs = saved.connect_state;
// Update to the new session's ID so Spotify sees us as the same device.
cs.set_session_id(session.session_id());
(cs, saved.play_status, saved.play_request_id)
}
None => (
ConnectState::new(config, &session),
SpircPlayStatus::Stopped,
None,
),
};

let connection_id_update = session
.dealer()
Expand Down Expand Up @@ -235,8 +266,8 @@ impl Spirc {
connect_state,
connect_established: false,

play_request_id: None,
play_status: SpircPlayStatus::Stopped,
play_request_id,
play_status,

connection_id_update,
connect_state_update,
Expand Down Expand Up @@ -438,7 +469,7 @@ impl Spirc {
}

impl SpircTask {
async fn run(mut self) {
async fn run(mut self) -> Option<SavedPlaybackState> {
// simplify unwrapping of received item or parsed result
macro_rules! unwrap {
( $next:expr, |$some:ident| $use_some:expr ) => {
Expand All @@ -458,9 +489,12 @@ impl SpircTask {
};
}

// Subscribe before start() so we can't miss a reconnect notification.
let mut reconnect_rx = self.session.dealer().reconnect_receiver();

if let Err(why) = self.session.dealer().start().await {
error!("starting dealer failed: {why}");
return;
return None;
}

while !self.session.is_invalid() && !self.shutdown {
Expand All @@ -477,7 +511,13 @@ impl SpircTask {
connection_id_update,
match |connection_id| if let Err(why) = self.handle_connection_id_update(connection_id).await {
error!("failed handling connection id update: {why}");
break;
if !self.connect_established {
// Initial registration failed — can't process
// commands without it, so restart spirc.
break;
}
// Re-registration after dealer reconnect failed —
// stay alive, next connection_id push will retry.
}
},
// main dealer update of any remote device updates
Expand Down Expand Up @@ -585,10 +625,33 @@ impl SpircTask {
}
}
},
// Dealer reconnected after a connection loss. Our subscription
// streams survive because they're registered on the shared
// DealerShared — the new websocket dispatches through the same
// handlers. A new connection_id will arrive via
// connection_id_update and re-register our device state.
Ok(()) = reconnect_rx.changed() => {
info!("Dealer reconnected; awaiting new connection_id.");
},
else => break
}
}

if self.session.is_invalid() && !self.shutdown {
// Session TCP connection died unexpectedly — skip server
// communication that would fail anyway. The Player continues
// playing from its buffer; main.rs will create a new session.
warn!(
"session lost, saving playback state for recovery: {:?}",
self.play_status
);
return Some(SavedPlaybackState {
connect_state: mem::take(&mut self.connect_state),
play_status: mem::replace(&mut self.play_status, SpircPlayStatus::Stopped),
play_request_id: self.play_request_id.take(),
});
}

if !self.shutdown && self.connect_state.is_active() {
warn!("unexpected shutdown");
if let Err(why) = self.handle_disconnect().await {
Expand All @@ -602,6 +665,7 @@ impl SpircTask {
};

self.session.dealer().close().await;
None
}

fn handle_next_context(&mut self, next_context: Result<Context, Error>) -> bool {
Expand Down Expand Up @@ -889,6 +953,22 @@ impl SpircTask {
trace!("Received connection ID update: {connection_id:?}");
self.session.set_connection_id(&connection_id);

// If we have active playback (e.g. restored from saved state),
// update the position before registering so Spotify sees the
// correct track position.
if !matches!(self.play_status, SpircPlayStatus::Stopped) {
info!(
"re-registering with active playback state: {:?}",
self.play_status
);
self.connect_state.set_status(&self.play_status);
if self.connect_state.is_playing() {
self.connect_state
.update_position_in_relation(self.now_ms());
}
self.connect_state.set_now(self.now_ms() as u64);
}

let cluster = match self
.connect_state
.notify_new_device_appeared(&self.session)
Expand Down
11 changes: 9 additions & 2 deletions core/src/dealer/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures_core::Stream;
use futures_util::StreamExt;
use std::{pin::Pin, str::FromStr, sync::OnceLock};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
use url::Url;

Expand All @@ -16,6 +16,7 @@ component! {
DealerManager: DealerManagerInner {
builder: OnceLock<Builder> = OnceLock::from(Builder::new()),
dealer: OnceLock<Dealer> = OnceLock::new(),
reconnect_tx: watch::Sender<u64> = watch::Sender::new(0),
Comment thread
antoinecellerier marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -153,10 +154,12 @@ impl DealerManager {
// and the token is expired we will just get 401 error
let get_url = move || Self::get_url(session.clone());

let reconnect_tx = self.lock(|inner| inner.reconnect_tx.clone());

let dealer = self
.lock(move |inner| inner.builder.take())
.ok_or(DealerError::BuilderNotAvailable)?
.launch(get_url, None)
.launch(get_url, None, reconnect_tx)
.await
.map_err(DealerError::LaunchFailure)?;

Expand All @@ -171,4 +174,8 @@ impl DealerManager {
dealer.close().await
}
}

pub fn reconnect_receiver(&self) -> watch::Receiver<u64> {
self.lock(|inner| inner.reconnect_tx.subscribe())
}
}
Loading
Loading