Skip to content

Commit 772e90f

Browse files
committed
Add session recovery with automatic playback resume
When the TCP connection to Spotify access points drops due to network instability, the keep-alive timeout fires after ~80s and invalidates the session. The old spirc shuts down and a new Session + Spirc is created. However, three things went wrong: 1. Session ID mismatch: The new Session generated a fresh random UUID, so Spotify could not match it to the previous playback session. The automatic transfer in handle_connection_id_update failed because the cluster session_id did not match the new session_id. 2. Volume reset: ConnectConfig.initial_volume was set once at startup and reused on every reconnection. The new SpircTask initialized the mixer to this stale value instead of the users current volume, causing a jarring volume jump on reconnect. 3. Playback not resuming: The transfer state from Spotify always has is_paused=true after a disconnect, since the device went offline. handle_transfer used this to set start_playing=false, so the track loaded paused even though the user was actively listening. Fixes: - Preserve session_id across reconnections for automatic transfer - Preserve mixer volume on reconnect, only override after first connect - Track user play intent via shared Arc<AtomicBool> was_playing flag - Add force_play parameter to handle_transfer for reconnection resume - Add shutdown reason strings to session.shutdown for debugging - Fast-fail audio key requests when session is invalid - Use RwLock<Session> in player for session hot-swap
1 parent 33bf3a7 commit 772e90f

6 files changed

Lines changed: 81 additions & 27 deletions

File tree

connect/src/spirc.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use protobuf::MessageField;
3838
use std::{
3939
future::Future,
4040
sync::Arc,
41-
sync::atomic::{AtomicUsize, Ordering},
41+
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
4242
time::{Duration, SystemTime, UNIX_EPOCH},
4343
};
4444
use thiserror::Error;
@@ -112,6 +112,9 @@ struct SpircTask {
112112
update_state: bool,
113113

114114
spirc_id: usize,
115+
116+
/// shared flag to preserve play state across reconnections
117+
was_playing: Arc<AtomicBool>,
115118
}
116119

117120
static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0);
@@ -163,6 +166,7 @@ impl Spirc {
163166
credentials: Credentials,
164167
player: Arc<Player>,
165168
mixer: Arc<dyn Mixer>,
169+
was_playing: Arc<AtomicBool>,
166170
) -> Result<(Spirc, impl Future<Output = ()>), Error> {
167171
fn extract_connection_id(msg: Message) -> Result<String, Error> {
168172
let connection_id = msg
@@ -262,6 +266,7 @@ impl Spirc {
262266
update_state: false,
263267

264268
spirc_id,
269+
was_playing,
265270
};
266271

267272
let spirc = Spirc { commands: cmd_tx };
@@ -930,7 +935,9 @@ impl SpircTask {
930935
use protobuf::Message;
931936

932937
match TransferState::parse_from_bytes(&cluster.transfer_data) {
933-
Ok(transfer_state) => self.handle_transfer(transfer_state)?,
938+
Ok(transfer_state) => {
939+
self.handle_transfer(transfer_state, self.was_playing.load(Ordering::Relaxed))?
940+
}
934941
Err(why) => error!("failed to take over control: {why}"),
935942
}
936943

@@ -1067,7 +1074,7 @@ impl SpircTask {
10671074
}
10681075
// modification and update of the connect_state
10691076
Transfer(transfer) => {
1070-
self.handle_transfer(transfer.data.expect("by condition checked"))?;
1077+
self.handle_transfer(transfer.data.expect("by condition checked"), false)?;
10711078
return self.notify().await;
10721079
}
10731080
Play(mut play) => {
@@ -1164,7 +1171,11 @@ impl SpircTask {
11641171
Ok(())
11651172
}
11661173

1167-
fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> {
1174+
fn handle_transfer(
1175+
&mut self,
1176+
mut transfer: TransferState,
1177+
force_play: bool,
1178+
) -> Result<(), Error> {
11681179
let mut ctx_uri = match transfer.current_session.context.uri {
11691180
None => Err(SpircError::NoUri("transfer context"))?,
11701181
// can apparently happen when a state is transferred and was started with "uris" via the api
@@ -1250,7 +1261,7 @@ impl SpircTask {
12501261
_ => 0,
12511262
};
12521263

1253-
let is_playing = !transfer.playback.is_paused();
1264+
let is_playing = force_play || !transfer.playback.is_paused();
12541265

12551266
if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay {
12561267
if let Some(ctx_uri) = ctx_uri {
@@ -1533,6 +1544,8 @@ impl SpircTask {
15331544
_ => return,
15341545
}
15351546

1547+
self.was_playing.store(true, Ordering::Relaxed);
1548+
15361549
// Synchronize the volume from the mixer. This is useful on
15371550
// systems that can switch sources from and back to librespot.
15381551
let current_volume = self.mixer.volume();
@@ -1572,6 +1585,7 @@ impl SpircTask {
15721585
}
15731586
_ => (),
15741587
}
1588+
self.was_playing.store(false, Ordering::Relaxed);
15751589
}
15761590

15771591
fn handle_seek(&mut self, position_ms: u32) {
@@ -1888,6 +1902,7 @@ impl SpircTask {
18881902
} else {
18891903
self.play_status = SpircPlayStatus::LoadingPause { position_ms };
18901904
}
1905+
self.was_playing.store(start_playing, Ordering::Relaxed);
18911906
self.connect_state.set_status(&self.play_status);
18921907

18931908
Ok(())

core/src/audio_key.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub enum AudioKeyError {
2020
Packet(u8),
2121
#[error("sequence {0} not pending")]
2222
Sequence(u32),
23+
#[error("session is invalid")]
24+
SessionInvalid,
2325
#[error("audio key response timeout")]
2426
Timeout,
2527
}
@@ -31,6 +33,7 @@ impl From<AudioKeyError> for Error {
3133
AudioKeyError::Channel => Error::aborted(err),
3234
AudioKeyError::Sequence(_) => Error::aborted(err),
3335
AudioKeyError::Packet(_) => Error::unimplemented(err),
36+
AudioKeyError::SessionInvalid => Error::aborted(err),
3437
AudioKeyError::Timeout => Error::aborted(err),
3538
}
3639
}
@@ -79,6 +82,12 @@ impl AudioKeyManager {
7982
}
8083

8184
pub async fn request(&self, track: SpotifyId, file: FileId) -> Result<AudioKey, Error> {
85+
// Fast-fail if session is already invalid to avoid waiting for timeout
86+
if self.session().is_invalid() {
87+
error!("Audio key request rejected: session is invalid");
88+
return Err(AudioKeyError::SessionInvalid.into());
89+
}
90+
8291
let (tx, rx) = oneshot::channel();
8392

8493
let seq = self.lock(move |inner| {

core/src/session.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl Session {
276276
error!("{e}");
277277
if let Some(session) = session_weak.try_upgrade() {
278278
if !session.is_invalid() {
279-
session.shutdown();
279+
session.shutdown("packet sender task failed");
280280
}
281281
}
282282
}
@@ -635,8 +635,8 @@ impl Session {
635635
SessionWeak(Arc::downgrade(&self.0))
636636
}
637637

638-
pub fn shutdown(&self) {
639-
debug!("Shutdown: Invalidating session");
638+
pub fn shutdown(&self, reason: &str) {
639+
debug!("Shutdown: Invalidating session: {reason}");
640640
self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
641641
self.mercury().shutdown();
642642
self.channel().shutdown();
@@ -893,12 +893,12 @@ where
893893
}
894894
Poll::Ready(None) => {
895895
warn!("Connection to server closed.");
896-
session.shutdown();
896+
session.shutdown("connection closed by server");
897897
return Poll::Ready(Ok(()));
898898
}
899899
Poll::Ready(Some(Err(e))) => {
900900
error!("Connection to server closed.");
901-
session.shutdown();
901+
session.shutdown("connection error");
902902
return Poll::Ready(Err(e));
903903
}
904904
Poll::Pending => break,
@@ -927,7 +927,7 @@ where
927927
match this.keep_alive_state {
928928
ExpectingPing | ExpectingPongAck => {
929929
if !session.is_invalid() {
930-
session.shutdown();
930+
session.shutdown("keep-alive timeout");
931931
}
932932
// TODO: Optionally reconnect (with cached/last credentials?)
933933
return Poll::Ready(Err(io::Error::new(

examples/play_connect.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::{Arc, atomic::AtomicBool};
2+
13
use librespot::{
24
connect::{ConnectConfig, LoadRequest, LoadRequestOptions, Spirc},
35
core::{
@@ -59,8 +61,16 @@ async fn main() -> Result<(), Error> {
5961
move || sink_builder(None, audio_format),
6062
);
6163

62-
let (spirc, spirc_task) =
63-
Spirc::new(connect_config, session.clone(), credentials, player, mixer).await?;
64+
let was_playing = Arc::new(AtomicBool::new(false));
65+
let (spirc, spirc_task) = Spirc::new(
66+
connect_config,
67+
session.clone(),
68+
credentials,
69+
player,
70+
mixer,
71+
was_playing,
72+
)
73+
.await?;
6474

6575
// these calls can be seen as "queued"
6676
spirc.activate()?;

playback/src/player.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use std::{
77
mem,
88
pin::Pin,
99
process::exit,
10-
sync::Mutex,
1110
sync::{
1211
Arc,
1312
atomic::{AtomicUsize, Ordering},
1413
},
14+
sync::{Mutex, RwLock},
1515
task::{Context, Poll},
1616
thread,
1717
time::{Duration, Instant},
@@ -69,7 +69,7 @@ pub enum SinkStatus {
6969
pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
7070

7171
struct PlayerInternal {
72-
session: Session,
72+
session: Arc<RwLock<Session>>,
7373
config: PlayerConfig,
7474
commands: mpsc::UnboundedReceiver<PlayerCommand>,
7575
load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
@@ -504,7 +504,7 @@ impl Player {
504504
create_local_file_lookup(config.local_file_directories.as_slice());
505505

506506
let internal = PlayerInternal {
507-
session,
507+
session: Arc::new(RwLock::new(session)),
508508
config,
509509
commands: cmd_rx,
510510
load_handles: Arc::new(Mutex::new(HashMap::new())),
@@ -932,7 +932,7 @@ impl PlayerState {
932932
}
933933

934934
struct PlayerTrackLoader {
935-
session: Session,
935+
session: Arc<RwLock<Session>>,
936936
config: PlayerConfig,
937937
local_file_lookup: Arc<LocalFileLookup>,
938938
}
@@ -947,9 +947,10 @@ impl PlayerTrackLoader {
947947
} else if let Some(alternatives) = audio_item.alternatives {
948948
let Tracks(alternatives_vec) = alternatives; // required to make `into_iter` able to move
949949

950+
let session = self.session.read().unwrap().clone();
950951
let alternatives: FuturesUnordered<_> = alternatives_vec
951952
.into_iter()
952-
.map(|alt_id| AudioItem::get_file(&self.session, alt_id))
953+
.map(|alt_id| AudioItem::get_file(&session, alt_id))
953954
.collect();
954955

955956
alternatives
@@ -1019,7 +1020,8 @@ impl PlayerTrackLoader {
10191020
}
10201021
};
10211022

1022-
let audio_item = match AudioItem::get_file(&self.session, track_uri).await {
1023+
let session = self.session.read().unwrap().clone();
1024+
let audio_item = match AudioItem::get_file(&session, track_uri).await {
10231025
Ok(audio) => match self.find_available_alternative(audio).await {
10241026
Some(audio) => audio,
10251027
None => {
@@ -1091,7 +1093,8 @@ impl PlayerTrackLoader {
10911093
// This is only a loop to be able to reload the file if an error occurred
10921094
// while opening a cached file.
10931095
loop {
1094-
let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second);
1096+
let session = self.session.read().unwrap().clone();
1097+
let encrypted_file = AudioFile::open(&session, file_id, bytes_per_second);
10951098

10961099
let encrypted_file = match encrypted_file.await {
10971100
Ok(encrypted_file) => encrypted_file,
@@ -1108,7 +1111,8 @@ impl PlayerTrackLoader {
11081111
// Not all audio files are encrypted. If we can't get a key, try loading the track
11091112
// without decryption. If the file was encrypted after all, the decoder will fail
11101113
// parsing and bail out, so we should be safe from outputting ear-piercing noise.
1111-
let key = match self.session.audio_key().request(track_id, file_id).await {
1114+
let session = self.session.read().unwrap().clone();
1115+
let key = match session.audio_key().request(track_id, file_id).await {
11121116
Ok(key) => Some(key),
11131117
Err(e) => {
11141118
warn!("Unable to load key, continuing without decryption: {e}");
@@ -1176,7 +1180,7 @@ impl PlayerTrackLoader {
11761180
Err(e) if is_cached => {
11771181
warn!("Unable to read cached audio file: {e}. Trying to download it.");
11781182

1179-
match self.session.cache() {
1183+
match self.session.read().unwrap().clone().cache() {
11801184
Some(cache) => {
11811185
if cache.remove_file(file_id).is_err() {
11821186
error!("Error removing file from cache");
@@ -2316,7 +2320,7 @@ impl PlayerInternal {
23162320

23172321
PlayerCommand::Stop => self.handle_player_stop(),
23182322

2319-
PlayerCommand::SetSession(session) => self.session = session,
2323+
PlayerCommand::SetSession(session) => *self.session.write().unwrap() = session,
23202324

23212325
PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
23222326

src/main.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ use std::{
3131
pin::Pin,
3232
process::exit,
3333
str::FromStr,
34+
sync::{
35+
Arc,
36+
atomic::{AtomicBool, Ordering},
37+
},
3438
time::{Duration, Instant},
3539
};
3640
use sysinfo::{ProcessesToUpdate, System};
@@ -1895,9 +1899,11 @@ async fn main() {
18951899
let mut last_credentials = None;
18961900
let mut spirc: Option<Spirc> = None;
18971901
let mut spirc_task: Option<Pin<_>> = None;
1902+
let was_playing = Arc::new(AtomicBool::new(false));
18981903
let mut auto_connect_times: Vec<Instant> = vec![];
18991904
let mut discovery = None;
19001905
let mut connecting = false;
1906+
let mut has_been_connected = false;
19011907
let mut _event_handler: Option<EventHandler> = None;
19021908

19031909
let mut session = Session::new(setup.session_config.clone(), setup.cache.clone());
@@ -2030,7 +2036,7 @@ async fn main() {
20302036
tokio::spawn(spirc_task);
20312037
}
20322038
if !session.is_invalid() {
2033-
session.shutdown();
2039+
session.shutdown("new credentials received");
20342040
}
20352041

20362042
connecting = true;
@@ -2043,17 +2049,26 @@ async fn main() {
20432049
},
20442050
_ = async {}, if connecting && last_credentials.is_some() => {
20452051
if session.is_invalid() {
2052+
let old_session_id = session.session_id();
20462053
session = Session::new(setup.session_config.clone(), setup.cache.clone());
2054+
if !old_session_id.is_empty() {
2055+
session.set_session_id(&old_session_id);
2056+
info!("Preserved session_id across reconnection: {old_session_id}");
2057+
}
20472058
player.set_session(session.clone());
20482059
}
20492060

2050-
let connect_config = setup.connect_config.clone();
2061+
let mut connect_config = setup.connect_config.clone();
2062+
if has_been_connected {
2063+
connect_config.initial_volume = mixer.volume();
2064+
}
20512065

20522066
let (spirc_, spirc_task_) = match Spirc::new(connect_config,
20532067
session.clone(),
20542068
last_credentials.clone().unwrap_or_default(),
20552069
player.clone(),
2056-
mixer.clone()).await {
2070+
mixer.clone(),
2071+
was_playing.clone()).await {
20572072
Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_),
20582073
Err(e) => {
20592074
error!("could not initialize spirc: {e}");
@@ -2064,6 +2079,7 @@ async fn main() {
20642079
spirc_task = Some(Box::pin(spirc_task_));
20652080

20662081
connecting = false;
2082+
has_been_connected = true;
20672083
},
20682084
_ = async {
20692085
if let Some(task) = spirc_task.as_mut() {
@@ -2082,7 +2098,7 @@ async fn main() {
20822098
if last_credentials.is_some() && !reconnect_exceeds_rate_limit() {
20832099
auto_connect_times.push(Instant::now());
20842100
if !session.is_invalid() {
2085-
session.shutdown();
2101+
session.shutdown("spirc shut down unexpectedly");
20862102
}
20872103
connecting = true;
20882104
} else {

0 commit comments

Comments
 (0)