Skip to content

Commit 941a55a

Browse files
committed
Add session recovery with automatic playback resume
When the TCP connection to Spotify access points drops due to network instability, the keep-alive timeout fires after ~80s and invalidates the session. The old spirc shuts down and a new Session + Spirc is created. However, three things went wrong: 1. Session ID mismatch: The new Session generated a fresh random UUID, so Spotify could not match it to the previous playback session. The automatic transfer in handle_connection_id_update failed because the cluster session_id did not match the new session_id. 2. Volume reset: ConnectConfig.initial_volume was set once at startup and reused on every reconnection. The new SpircTask initialized the mixer to this stale value instead of the users current volume, causing a jarring volume jump on reconnect. 3. Playback not resuming: The transfer state from Spotify always has is_paused=true after a disconnect, since the device went offline. handle_transfer used this to set start_playing=false, so the track loaded paused even though the user was actively listening. Fixes: - Preserve session_id across reconnections for automatic transfer - Preserve mixer volume on reconnect, only override after first connect - Track user play intent via shared Arc<AtomicBool> was_playing flag - Add force_play parameter to handle_transfer for reconnection resume - Add shutdown reason strings to session.shutdown for debugging - Fast-fail audio key requests when session is invalid - Use RwLock<Session> in player for session hot-swap
1 parent 33bf3a7 commit 941a55a

5 files changed

Lines changed: 63 additions & 25 deletions

File tree

connect/src/spirc.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use protobuf::MessageField;
3838
use std::{
3939
future::Future,
4040
sync::Arc,
41-
sync::atomic::{AtomicUsize, Ordering},
41+
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
4242
time::{Duration, SystemTime, UNIX_EPOCH},
4343
};
4444
use thiserror::Error;
@@ -112,6 +112,9 @@ struct SpircTask {
112112
update_state: bool,
113113

114114
spirc_id: usize,
115+
116+
/// shared flag to preserve play state across reconnections
117+
was_playing: Arc<AtomicBool>,
115118
}
116119

117120
static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0);
@@ -163,6 +166,7 @@ impl Spirc {
163166
credentials: Credentials,
164167
player: Arc<Player>,
165168
mixer: Arc<dyn Mixer>,
169+
was_playing: Arc<AtomicBool>,
166170
) -> Result<(Spirc, impl Future<Output = ()>), Error> {
167171
fn extract_connection_id(msg: Message) -> Result<String, Error> {
168172
let connection_id = msg
@@ -262,6 +266,7 @@ impl Spirc {
262266
update_state: false,
263267

264268
spirc_id,
269+
was_playing,
265270
};
266271

267272
let spirc = Spirc { commands: cmd_tx };
@@ -930,7 +935,7 @@ impl SpircTask {
930935
use protobuf::Message;
931936

932937
match TransferState::parse_from_bytes(&cluster.transfer_data) {
933-
Ok(transfer_state) => self.handle_transfer(transfer_state)?,
938+
Ok(transfer_state) => self.handle_transfer(transfer_state, self.was_playing.load(Ordering::Relaxed))?,
934939
Err(why) => error!("failed to take over control: {why}"),
935940
}
936941

@@ -1067,7 +1072,7 @@ impl SpircTask {
10671072
}
10681073
// modification and update of the connect_state
10691074
Transfer(transfer) => {
1070-
self.handle_transfer(transfer.data.expect("by condition checked"))?;
1075+
self.handle_transfer(transfer.data.expect("by condition checked"), false)?;
10711076
return self.notify().await;
10721077
}
10731078
Play(mut play) => {
@@ -1164,7 +1169,7 @@ impl SpircTask {
11641169
Ok(())
11651170
}
11661171

1167-
fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> {
1172+
fn handle_transfer(&mut self, mut transfer: TransferState, force_play: bool) -> Result<(), Error> {
11681173
let mut ctx_uri = match transfer.current_session.context.uri {
11691174
None => Err(SpircError::NoUri("transfer context"))?,
11701175
// can apparently happen when a state is transferred and was started with "uris" via the api
@@ -1250,7 +1255,7 @@ impl SpircTask {
12501255
_ => 0,
12511256
};
12521257

1253-
let is_playing = !transfer.playback.is_paused();
1258+
let is_playing = force_play || !transfer.playback.is_paused();
12541259

12551260
if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay {
12561261
if let Some(ctx_uri) = ctx_uri {
@@ -1533,6 +1538,8 @@ impl SpircTask {
15331538
_ => return,
15341539
}
15351540

1541+
self.was_playing.store(true, Ordering::Relaxed);
1542+
15361543
// Synchronize the volume from the mixer. This is useful on
15371544
// systems that can switch sources from and back to librespot.
15381545
let current_volume = self.mixer.volume();
@@ -1572,6 +1579,7 @@ impl SpircTask {
15721579
}
15731580
_ => (),
15741581
}
1582+
self.was_playing.store(false, Ordering::Relaxed);
15751583
}
15761584

15771585
fn handle_seek(&mut self, position_ms: u32) {
@@ -1885,8 +1893,10 @@ impl SpircTask {
18851893
.update_position(position_ms, self.now_ms());
18861894
if start_playing {
18871895
self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
1896+
self.was_playing.store(true, Ordering::Relaxed);
18881897
} else {
18891898
self.play_status = SpircPlayStatus::LoadingPause { position_ms };
1899+
self.was_playing.store(false, Ordering::Relaxed);
18901900
}
18911901
self.connect_state.set_status(&self.play_status);
18921902

core/src/audio_key.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub enum AudioKeyError {
2020
Packet(u8),
2121
#[error("sequence {0} not pending")]
2222
Sequence(u32),
23+
#[error("session is invalid")]
24+
SessionInvalid,
2325
#[error("audio key response timeout")]
2426
Timeout,
2527
}
@@ -31,6 +33,7 @@ impl From<AudioKeyError> for Error {
3133
AudioKeyError::Channel => Error::aborted(err),
3234
AudioKeyError::Sequence(_) => Error::aborted(err),
3335
AudioKeyError::Packet(_) => Error::unimplemented(err),
36+
AudioKeyError::SessionInvalid => Error::aborted(err),
3437
AudioKeyError::Timeout => Error::aborted(err),
3538
}
3639
}
@@ -79,6 +82,12 @@ impl AudioKeyManager {
7982
}
8083

8184
pub async fn request(&self, track: SpotifyId, file: FileId) -> Result<AudioKey, Error> {
85+
// Fast-fail if session is already invalid to avoid waiting for timeout
86+
if self.session().is_invalid() {
87+
error!("Audio key request rejected: session is invalid");
88+
return Err(AudioKeyError::SessionInvalid.into());
89+
}
90+
8291
let (tx, rx) = oneshot::channel();
8392

8493
let seq = self.lock(move |inner| {

core/src/session.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl Session {
276276
error!("{e}");
277277
if let Some(session) = session_weak.try_upgrade() {
278278
if !session.is_invalid() {
279-
session.shutdown();
279+
session.shutdown("packet sender task failed");
280280
}
281281
}
282282
}
@@ -635,8 +635,8 @@ impl Session {
635635
SessionWeak(Arc::downgrade(&self.0))
636636
}
637637

638-
pub fn shutdown(&self) {
639-
debug!("Shutdown: Invalidating session");
638+
pub fn shutdown(&self, reason: &str) {
639+
debug!("Shutdown: Invalidating session: {reason}");
640640
self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
641641
self.mercury().shutdown();
642642
self.channel().shutdown();
@@ -893,12 +893,12 @@ where
893893
}
894894
Poll::Ready(None) => {
895895
warn!("Connection to server closed.");
896-
session.shutdown();
896+
session.shutdown("connection closed by server");
897897
return Poll::Ready(Ok(()));
898898
}
899899
Poll::Ready(Some(Err(e))) => {
900900
error!("Connection to server closed.");
901-
session.shutdown();
901+
session.shutdown("connection error");
902902
return Poll::Ready(Err(e));
903903
}
904904
Poll::Pending => break,
@@ -927,7 +927,7 @@ where
927927
match this.keep_alive_state {
928928
ExpectingPing | ExpectingPongAck => {
929929
if !session.is_invalid() {
930-
session.shutdown();
930+
session.shutdown("keep-alive timeout");
931931
}
932932
// TODO: Optionally reconnect (with cached/last credentials?)
933933
return Poll::Ready(Err(io::Error::new(

playback/src/player.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
mem,
88
pin::Pin,
99
process::exit,
10-
sync::Mutex,
10+
sync::{Mutex, RwLock},
1111
sync::{
1212
Arc,
1313
atomic::{AtomicUsize, Ordering},
@@ -17,6 +17,8 @@ use std::{
1717
time::{Duration, Instant},
1818
};
1919

20+
21+
2022
#[cfg(feature = "passthrough-decoder")]
2123
use crate::decoder::PassthroughDecoder;
2224
use crate::{
@@ -69,7 +71,7 @@ pub enum SinkStatus {
6971
pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
7072

7173
struct PlayerInternal {
72-
session: Session,
74+
session: Arc<RwLock<Session>>,
7375
config: PlayerConfig,
7476
commands: mpsc::UnboundedReceiver<PlayerCommand>,
7577
load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
@@ -504,7 +506,7 @@ impl Player {
504506
create_local_file_lookup(config.local_file_directories.as_slice());
505507

506508
let internal = PlayerInternal {
507-
session,
509+
session: Arc::new(RwLock::new(session)),
508510
config,
509511
commands: cmd_rx,
510512
load_handles: Arc::new(Mutex::new(HashMap::new())),
@@ -932,7 +934,7 @@ impl PlayerState {
932934
}
933935

934936
struct PlayerTrackLoader {
935-
session: Session,
937+
session: Arc<RwLock<Session>>,
936938
config: PlayerConfig,
937939
local_file_lookup: Arc<LocalFileLookup>,
938940
}
@@ -947,9 +949,10 @@ impl PlayerTrackLoader {
947949
} else if let Some(alternatives) = audio_item.alternatives {
948950
let Tracks(alternatives_vec) = alternatives; // required to make `into_iter` able to move
949951

952+
let session = self.session.read().unwrap().clone();
950953
let alternatives: FuturesUnordered<_> = alternatives_vec
951954
.into_iter()
952-
.map(|alt_id| AudioItem::get_file(&self.session, alt_id))
955+
.map(|alt_id| AudioItem::get_file(&session, alt_id))
953956
.collect();
954957

955958
alternatives
@@ -1019,7 +1022,8 @@ impl PlayerTrackLoader {
10191022
}
10201023
};
10211024

1022-
let audio_item = match AudioItem::get_file(&self.session, track_uri).await {
1025+
let session = self.session.read().unwrap().clone();
1026+
let audio_item = match AudioItem::get_file(&session, track_uri).await {
10231027
Ok(audio) => match self.find_available_alternative(audio).await {
10241028
Some(audio) => audio,
10251029
None => {
@@ -1091,7 +1095,8 @@ impl PlayerTrackLoader {
10911095
// This is only a loop to be able to reload the file if an error occurred
10921096
// while opening a cached file.
10931097
loop {
1094-
let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second);
1098+
let session = self.session.read().unwrap().clone();
1099+
let encrypted_file = AudioFile::open(&session, file_id, bytes_per_second);
10951100

10961101
let encrypted_file = match encrypted_file.await {
10971102
Ok(encrypted_file) => encrypted_file,
@@ -1108,7 +1113,8 @@ impl PlayerTrackLoader {
11081113
// Not all audio files are encrypted. If we can't get a key, try loading the track
11091114
// without decryption. If the file was encrypted after all, the decoder will fail
11101115
// parsing and bail out, so we should be safe from outputting ear-piercing noise.
1111-
let key = match self.session.audio_key().request(track_id, file_id).await {
1116+
let session = self.session.read().unwrap().clone();
1117+
let key = match session.audio_key().request(track_id, file_id).await {
11121118
Ok(key) => Some(key),
11131119
Err(e) => {
11141120
warn!("Unable to load key, continuing without decryption: {e}");
@@ -1176,7 +1182,7 @@ impl PlayerTrackLoader {
11761182
Err(e) if is_cached => {
11771183
warn!("Unable to read cached audio file: {e}. Trying to download it.");
11781184

1179-
match self.session.cache() {
1185+
match self.session.read().unwrap().clone().cache() {
11801186
Some(cache) => {
11811187
if cache.remove_file(file_id).is_err() {
11821188
error!("Error removing file from cache");
@@ -2316,7 +2322,7 @@ impl PlayerInternal {
23162322

23172323
PlayerCommand::Stop => self.handle_player_stop(),
23182324

2319-
PlayerCommand::SetSession(session) => self.session = session,
2325+
PlayerCommand::SetSession(session) => *self.session.write().unwrap() = session,
23202326

23212327
PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
23222328

src/main.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use std::{
3131
pin::Pin,
3232
process::exit,
3333
str::FromStr,
34+
sync::{Arc, atomic::{AtomicBool, Ordering}},
3435
time::{Duration, Instant},
3536
};
3637
use sysinfo::{ProcessesToUpdate, System};
@@ -1895,9 +1896,11 @@ async fn main() {
18951896
let mut last_credentials = None;
18961897
let mut spirc: Option<Spirc> = None;
18971898
let mut spirc_task: Option<Pin<_>> = None;
1899+
let was_playing = Arc::new(AtomicBool::new(false));
18981900
let mut auto_connect_times: Vec<Instant> = vec![];
18991901
let mut discovery = None;
19001902
let mut connecting = false;
1903+
let mut has_been_connected = false;
19011904
let mut _event_handler: Option<EventHandler> = None;
19021905

19031906
let mut session = Session::new(setup.session_config.clone(), setup.cache.clone());
@@ -2030,7 +2033,7 @@ async fn main() {
20302033
tokio::spawn(spirc_task);
20312034
}
20322035
if !session.is_invalid() {
2033-
session.shutdown();
2036+
session.shutdown("new credentials received");
20342037
}
20352038

20362039
connecting = true;
@@ -2043,17 +2046,26 @@ async fn main() {
20432046
},
20442047
_ = async {}, if connecting && last_credentials.is_some() => {
20452048
if session.is_invalid() {
2049+
let old_session_id = session.session_id();
20462050
session = Session::new(setup.session_config.clone(), setup.cache.clone());
2051+
if !old_session_id.is_empty() {
2052+
session.set_session_id(&old_session_id);
2053+
info!("Preserved session_id across reconnection: {old_session_id}");
2054+
}
20472055
player.set_session(session.clone());
20482056
}
20492057

2050-
let connect_config = setup.connect_config.clone();
2058+
let mut connect_config = setup.connect_config.clone();
2059+
if has_been_connected {
2060+
connect_config.initial_volume = mixer.volume();
2061+
}
20512062

20522063
let (spirc_, spirc_task_) = match Spirc::new(connect_config,
20532064
session.clone(),
20542065
last_credentials.clone().unwrap_or_default(),
20552066
player.clone(),
2056-
mixer.clone()).await {
2067+
mixer.clone(),
2068+
was_playing.clone()).await {
20572069
Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_),
20582070
Err(e) => {
20592071
error!("could not initialize spirc: {e}");
@@ -2064,6 +2076,7 @@ async fn main() {
20642076
spirc_task = Some(Box::pin(spirc_task_));
20652077

20662078
connecting = false;
2079+
has_been_connected = true;
20672080
},
20682081
_ = async {
20692082
if let Some(task) = spirc_task.as_mut() {
@@ -2082,7 +2095,7 @@ async fn main() {
20822095
if last_credentials.is_some() && !reconnect_exceeds_rate_limit() {
20832096
auto_connect_times.push(Instant::now());
20842097
if !session.is_invalid() {
2085-
session.shutdown();
2098+
session.shutdown("spirc shut down unexpectedly");
20862099
}
20872100
connecting = true;
20882101
} else {

0 commit comments

Comments
 (0)