diff --git a/connect/README.md b/connect/README.md index 015cce3db..ccc871c4e 100644 --- a/connect/README.md +++ b/connect/README.md @@ -20,7 +20,7 @@ A basic example in which the `Spirc` and `SpircTask` is used can be found here: # Example ```rust -use std::{future::Future, thread}; +use std::{future::Future, sync::{Arc, atomic::AtomicBool}, thread}; use librespot_connect::{ConnectConfig, Spirc}; use librespot_core::{authentication::Credentials, Error, Session, SessionConfig}; @@ -50,12 +50,15 @@ async fn create_basic_spirc() -> Result<(), Error> { let mixer = mixer::find(None).expect("will default to SoftMixer"); + let was_playing = Arc::new(AtomicBool::new(false)); + let (spirc, spirc_task): (Spirc, _) = Spirc::new( ConnectConfig::default(), session, credentials, player, - mixer(MixerConfig::default())? + mixer(MixerConfig::default())?, + was_playing, ).await?; Ok(()) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index fec6057c4..fc454fd27 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -38,7 +38,7 @@ use protobuf::MessageField; use std::{ future::Future, sync::Arc, - sync::atomic::{AtomicUsize, Ordering}, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, time::{Duration, SystemTime, UNIX_EPOCH}, }; use thiserror::Error; @@ -112,6 +112,9 @@ struct SpircTask { update_state: bool, spirc_id: usize, + + /// shared flag to preserve play state across reconnections + was_playing: Arc, } static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -163,6 +166,7 @@ impl Spirc { credentials: Credentials, player: Arc, mixer: Arc, + was_playing: Arc, ) -> Result<(Spirc, impl Future), Error> { fn extract_connection_id(msg: Message) -> Result { let connection_id = msg @@ -262,6 +266,7 @@ impl Spirc { update_state: false, spirc_id, + was_playing, }; let spirc = Spirc { commands: cmd_tx }; @@ -930,7 +935,9 @@ impl SpircTask { use protobuf::Message; match TransferState::parse_from_bytes(&cluster.transfer_data) { - Ok(transfer_state) => self.handle_transfer(transfer_state)?, + Ok(transfer_state) => { + self.handle_transfer(transfer_state, self.was_playing.load(Ordering::Relaxed))? + } Err(why) => error!("failed to take over control: {why}"), } @@ -1067,7 +1074,7 @@ impl SpircTask { } // modification and update of the connect_state Transfer(transfer) => { - self.handle_transfer(transfer.data.expect("by condition checked"))?; + self.handle_transfer(transfer.data.expect("by condition checked"), false)?; return self.notify().await; } Play(mut play) => { @@ -1164,7 +1171,11 @@ impl SpircTask { Ok(()) } - fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> { + fn handle_transfer( + &mut self, + mut transfer: TransferState, + force_play: bool, + ) -> Result<(), Error> { let mut ctx_uri = match transfer.current_session.context.uri { None => Err(SpircError::NoUri("transfer context"))?, // can apparently happen when a state is transferred and was started with "uris" via the api @@ -1250,7 +1261,7 @@ impl SpircTask { _ => 0, }; - let is_playing = !transfer.playback.is_paused(); + let is_playing = force_play || !transfer.playback.is_paused(); if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay { if let Some(ctx_uri) = ctx_uri { @@ -1533,6 +1544,8 @@ impl SpircTask { _ => return, } + self.was_playing.store(true, Ordering::Relaxed); + // Synchronize the volume from the mixer. This is useful on // systems that can switch sources from and back to librespot. let current_volume = self.mixer.volume(); @@ -1572,6 +1585,7 @@ impl SpircTask { } _ => (), } + self.was_playing.store(false, Ordering::Relaxed); } fn handle_seek(&mut self, position_ms: u32) { @@ -1888,6 +1902,7 @@ impl SpircTask { } else { self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } + self.was_playing.store(start_playing, Ordering::Relaxed); self.connect_state.set_status(&self.play_status); Ok(()) diff --git a/core/src/audio_key.rs b/core/src/audio_key.rs index 42cad43c7..394c70ab6 100644 --- a/core/src/audio_key.rs +++ b/core/src/audio_key.rs @@ -20,6 +20,8 @@ pub enum AudioKeyError { Packet(u8), #[error("sequence {0} not pending")] Sequence(u32), + #[error("session is invalid")] + SessionInvalid, #[error("audio key response timeout")] Timeout, } @@ -31,6 +33,7 @@ impl From for Error { AudioKeyError::Channel => Error::aborted(err), AudioKeyError::Sequence(_) => Error::aborted(err), AudioKeyError::Packet(_) => Error::unimplemented(err), + AudioKeyError::SessionInvalid => Error::aborted(err), AudioKeyError::Timeout => Error::aborted(err), } } @@ -79,6 +82,12 @@ impl AudioKeyManager { } pub async fn request(&self, track: SpotifyId, file: FileId) -> Result { + // Fast-fail if session is already invalid to avoid waiting for timeout + if self.session().is_invalid() { + error!("Audio key request rejected: session is invalid"); + return Err(AudioKeyError::SessionInvalid.into()); + } + let (tx, rx) = oneshot::channel(); let seq = self.lock(move |inner| { diff --git a/core/src/session.rs b/core/src/session.rs index 333678fd2..3705a86bb 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -276,7 +276,7 @@ impl Session { error!("{e}"); if let Some(session) = session_weak.try_upgrade() { if !session.is_invalid() { - session.shutdown(); + session.shutdown("packet sender task failed"); } } } @@ -635,8 +635,8 @@ impl Session { SessionWeak(Arc::downgrade(&self.0)) } - pub fn shutdown(&self) { - debug!("Shutdown: Invalidating session"); + pub fn shutdown(&self, reason: &str) { + debug!("Shutdown: Invalidating session: {reason}"); self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true; self.mercury().shutdown(); self.channel().shutdown(); @@ -893,12 +893,12 @@ where } Poll::Ready(None) => { warn!("Connection to server closed."); - session.shutdown(); + session.shutdown("connection closed by server"); return Poll::Ready(Ok(())); } Poll::Ready(Some(Err(e))) => { error!("Connection to server closed."); - session.shutdown(); + session.shutdown("connection error"); return Poll::Ready(Err(e)); } Poll::Pending => break, @@ -927,7 +927,7 @@ where match this.keep_alive_state { ExpectingPing | ExpectingPongAck => { if !session.is_invalid() { - session.shutdown(); + session.shutdown("keep-alive timeout"); } // TODO: Optionally reconnect (with cached/last credentials?) return Poll::Ready(Err(io::Error::new( diff --git a/examples/play_connect.rs b/examples/play_connect.rs index 1be6345ba..80e50ea97 100644 --- a/examples/play_connect.rs +++ b/examples/play_connect.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, atomic::AtomicBool}; + use librespot::{ connect::{ConnectConfig, LoadRequest, LoadRequestOptions, Spirc}, core::{ @@ -59,8 +61,16 @@ async fn main() -> Result<(), Error> { move || sink_builder(None, audio_format), ); - let (spirc, spirc_task) = - Spirc::new(connect_config, session.clone(), credentials, player, mixer).await?; + let was_playing = Arc::new(AtomicBool::new(false)); + let (spirc, spirc_task) = Spirc::new( + connect_config, + session.clone(), + credentials, + player, + mixer, + was_playing, + ) + .await?; // these calls can be seen as "queued" spirc.activate()?; diff --git a/playback/src/player.rs b/playback/src/player.rs index f3d803d51..97e038291 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -7,11 +7,11 @@ use std::{ mem, pin::Pin, process::exit, - sync::Mutex, sync::{ Arc, atomic::{AtomicUsize, Ordering}, }, + sync::{Mutex, RwLock}, task::{Context, Poll}, thread, time::{Duration, Instant}, @@ -69,7 +69,7 @@ pub enum SinkStatus { pub type SinkEventCallback = Box; struct PlayerInternal { - session: Session, + session: Arc>, config: PlayerConfig, commands: mpsc::UnboundedReceiver, load_handles: Arc>>>, @@ -504,7 +504,7 @@ impl Player { create_local_file_lookup(config.local_file_directories.as_slice()); let internal = PlayerInternal { - session, + session: Arc::new(RwLock::new(session)), config, commands: cmd_rx, load_handles: Arc::new(Mutex::new(HashMap::new())), @@ -932,7 +932,7 @@ impl PlayerState { } struct PlayerTrackLoader { - session: Session, + session: Arc>, config: PlayerConfig, local_file_lookup: Arc, } @@ -947,9 +947,10 @@ impl PlayerTrackLoader { } else if let Some(alternatives) = audio_item.alternatives { let Tracks(alternatives_vec) = alternatives; // required to make `into_iter` able to move + let session = self.session.read().unwrap().clone(); let alternatives: FuturesUnordered<_> = alternatives_vec .into_iter() - .map(|alt_id| AudioItem::get_file(&self.session, alt_id)) + .map(|alt_id| AudioItem::get_file(&session, alt_id)) .collect(); alternatives @@ -1019,7 +1020,8 @@ impl PlayerTrackLoader { } }; - let audio_item = match AudioItem::get_file(&self.session, track_uri).await { + let session = self.session.read().unwrap().clone(); + let audio_item = match AudioItem::get_file(&session, track_uri).await { Ok(audio) => match self.find_available_alternative(audio).await { Some(audio) => audio, None => { @@ -1091,7 +1093,8 @@ impl PlayerTrackLoader { // This is only a loop to be able to reload the file if an error occurred // while opening a cached file. loop { - let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second); + let session = self.session.read().unwrap().clone(); + let encrypted_file = AudioFile::open(&session, file_id, bytes_per_second); let encrypted_file = match encrypted_file.await { Ok(encrypted_file) => encrypted_file, @@ -1108,7 +1111,8 @@ impl PlayerTrackLoader { // Not all audio files are encrypted. If we can't get a key, try loading the track // without decryption. If the file was encrypted after all, the decoder will fail // parsing and bail out, so we should be safe from outputting ear-piercing noise. - let key = match self.session.audio_key().request(track_id, file_id).await { + let session = self.session.read().unwrap().clone(); + let key = match session.audio_key().request(track_id, file_id).await { Ok(key) => Some(key), Err(e) => { warn!("Unable to load key, continuing without decryption: {e}"); @@ -1176,7 +1180,7 @@ impl PlayerTrackLoader { Err(e) if is_cached => { warn!("Unable to read cached audio file: {e}. Trying to download it."); - match self.session.cache() { + match self.session.read().unwrap().clone().cache() { Some(cache) => { if cache.remove_file(file_id).is_err() { error!("Error removing file from cache"); @@ -2316,7 +2320,7 @@ impl PlayerInternal { PlayerCommand::Stop => self.handle_player_stop(), - PlayerCommand::SetSession(session) => self.session = session, + PlayerCommand::SetSession(session) => *self.session.write().unwrap() = session, PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), diff --git a/src/main.rs b/src/main.rs index 16ba1946f..bda23239d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,7 @@ use std::{ pin::Pin, process::exit, str::FromStr, + sync::{Arc, atomic::AtomicBool}, time::{Duration, Instant}, }; use sysinfo::{ProcessesToUpdate, System}; @@ -1895,9 +1896,11 @@ async fn main() { let mut last_credentials = None; let mut spirc: Option = None; let mut spirc_task: Option> = None; + let was_playing = Arc::new(AtomicBool::new(false)); let mut auto_connect_times: Vec = vec![]; let mut discovery = None; let mut connecting = false; + let mut has_been_connected = false; let mut _event_handler: Option = None; let mut session = Session::new(setup.session_config.clone(), setup.cache.clone()); @@ -2030,7 +2033,7 @@ async fn main() { tokio::spawn(spirc_task); } if !session.is_invalid() { - session.shutdown(); + session.shutdown("new credentials received"); } connecting = true; @@ -2043,17 +2046,26 @@ async fn main() { }, _ = async {}, if connecting && last_credentials.is_some() => { if session.is_invalid() { + let old_session_id = session.session_id(); session = Session::new(setup.session_config.clone(), setup.cache.clone()); + if !old_session_id.is_empty() { + session.set_session_id(&old_session_id); + info!("Preserved session_id across reconnection: {old_session_id}"); + } player.set_session(session.clone()); } - let connect_config = setup.connect_config.clone(); + let mut connect_config = setup.connect_config.clone(); + if has_been_connected { + connect_config.initial_volume = mixer.volume(); + } let (spirc_, spirc_task_) = match Spirc::new(connect_config, session.clone(), last_credentials.clone().unwrap_or_default(), player.clone(), - mixer.clone()).await { + mixer.clone(), + was_playing.clone()).await { Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_), Err(e) => { error!("could not initialize spirc: {e}"); @@ -2064,6 +2076,7 @@ async fn main() { spirc_task = Some(Box::pin(spirc_task_)); connecting = false; + has_been_connected = true; }, _ = async { if let Some(task) = spirc_task.as_mut() { @@ -2082,7 +2095,7 @@ async fn main() { if last_credentials.is_some() && !reconnect_exceeds_rate_limit() { auto_connect_times.push(Instant::now()); if !session.is_invalid() { - session.shutdown(); + session.shutdown("spirc shut down unexpectedly"); } connecting = true; } else {