Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ A basic example in which the `Spirc` and `SpircTask` is used can be found here:
# Example

```rust
use std::{future::Future, thread};
use std::{future::Future, sync::{Arc, atomic::AtomicBool}, thread};

use librespot_connect::{ConnectConfig, Spirc};
use librespot_core::{authentication::Credentials, Error, Session, SessionConfig};
Expand Down Expand Up @@ -50,12 +50,15 @@ async fn create_basic_spirc() -> Result<(), Error> {

let mixer = mixer::find(None).expect("will default to SoftMixer");

let was_playing = Arc::new(AtomicBool::new(false));

let (spirc, spirc_task): (Spirc, _) = Spirc::new(
ConnectConfig::default(),
session,
credentials,
player,
mixer(MixerConfig::default())?
mixer(MixerConfig::default())?,
was_playing,
).await?;

Ok(())
Expand Down
25 changes: 20 additions & 5 deletions connect/src/spirc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use protobuf::MessageField;
use std::{
future::Future,
sync::Arc,
sync::atomic::{AtomicUsize, Ordering},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use thiserror::Error;
Expand Down Expand Up @@ -112,6 +112,9 @@ struct SpircTask {
update_state: bool,

spirc_id: usize,

/// shared flag to preserve play state across reconnections
was_playing: Arc<AtomicBool>,
}

static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -163,6 +166,7 @@ impl Spirc {
credentials: Credentials,
player: Arc<Player>,
mixer: Arc<dyn Mixer>,
was_playing: Arc<AtomicBool>,
) -> Result<(Spirc, impl Future<Output = ()>), Error> {
fn extract_connection_id(msg: Message) -> Result<String, Error> {
let connection_id = msg
Expand Down Expand Up @@ -262,6 +266,7 @@ impl Spirc {
update_state: false,

spirc_id,
was_playing,
};

let spirc = Spirc { commands: cmd_tx };
Expand Down Expand Up @@ -930,7 +935,9 @@ impl SpircTask {
use protobuf::Message;

match TransferState::parse_from_bytes(&cluster.transfer_data) {
Ok(transfer_state) => self.handle_transfer(transfer_state)?,
Ok(transfer_state) => {
self.handle_transfer(transfer_state, self.was_playing.load(Ordering::Relaxed))?
}
Err(why) => error!("failed to take over control: {why}"),
}

Expand Down Expand Up @@ -1067,7 +1074,7 @@ impl SpircTask {
}
// modification and update of the connect_state
Transfer(transfer) => {
self.handle_transfer(transfer.data.expect("by condition checked"))?;
self.handle_transfer(transfer.data.expect("by condition checked"), false)?;
return self.notify().await;
}
Play(mut play) => {
Expand Down Expand Up @@ -1164,7 +1171,11 @@ impl SpircTask {
Ok(())
}

fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> {
fn handle_transfer(
&mut self,
mut transfer: TransferState,
force_play: bool,
) -> Result<(), Error> {
let mut ctx_uri = match transfer.current_session.context.uri {
None => Err(SpircError::NoUri("transfer context"))?,
// can apparently happen when a state is transferred and was started with "uris" via the api
Expand Down Expand Up @@ -1250,7 +1261,7 @@ impl SpircTask {
_ => 0,
};

let is_playing = !transfer.playback.is_paused();
let is_playing = force_play || !transfer.playback.is_paused();

if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay {
if let Some(ctx_uri) = ctx_uri {
Expand Down Expand Up @@ -1533,6 +1544,8 @@ impl SpircTask {
_ => return,
}

self.was_playing.store(true, Ordering::Relaxed);

// Synchronize the volume from the mixer. This is useful on
// systems that can switch sources from and back to librespot.
let current_volume = self.mixer.volume();
Expand Down Expand Up @@ -1572,6 +1585,7 @@ impl SpircTask {
}
_ => (),
}
self.was_playing.store(false, Ordering::Relaxed);
}

fn handle_seek(&mut self, position_ms: u32) {
Expand Down Expand Up @@ -1888,6 +1902,7 @@ impl SpircTask {
} else {
self.play_status = SpircPlayStatus::LoadingPause { position_ms };
}
self.was_playing.store(start_playing, Ordering::Relaxed);
self.connect_state.set_status(&self.play_status);

Ok(())
Expand Down
9 changes: 9 additions & 0 deletions core/src/audio_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum AudioKeyError {
Packet(u8),
#[error("sequence {0} not pending")]
Sequence(u32),
#[error("session is invalid")]
SessionInvalid,
#[error("audio key response timeout")]
Timeout,
}
Expand All @@ -31,6 +33,7 @@ impl From<AudioKeyError> for Error {
AudioKeyError::Channel => Error::aborted(err),
AudioKeyError::Sequence(_) => Error::aborted(err),
AudioKeyError::Packet(_) => Error::unimplemented(err),
AudioKeyError::SessionInvalid => Error::aborted(err),
AudioKeyError::Timeout => Error::aborted(err),
}
}
Expand Down Expand Up @@ -79,6 +82,12 @@ impl AudioKeyManager {
}

pub async fn request(&self, track: SpotifyId, file: FileId) -> Result<AudioKey, Error> {
// Fast-fail if session is already invalid to avoid waiting for timeout
if self.session().is_invalid() {
error!("Audio key request rejected: session is invalid");
return Err(AudioKeyError::SessionInvalid.into());
}

let (tx, rx) = oneshot::channel();

let seq = self.lock(move |inner| {
Expand Down
12 changes: 6 additions & 6 deletions core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl Session {
error!("{e}");
if let Some(session) = session_weak.try_upgrade() {
if !session.is_invalid() {
session.shutdown();
session.shutdown("packet sender task failed");
}
}
}
Expand Down Expand Up @@ -635,8 +635,8 @@ impl Session {
SessionWeak(Arc::downgrade(&self.0))
}

pub fn shutdown(&self) {
debug!("Shutdown: Invalidating session");
pub fn shutdown(&self, reason: &str) {
debug!("Shutdown: Invalidating session: {reason}");
self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
self.mercury().shutdown();
self.channel().shutdown();
Expand Down Expand Up @@ -893,12 +893,12 @@ where
}
Poll::Ready(None) => {
warn!("Connection to server closed.");
session.shutdown();
session.shutdown("connection closed by server");
return Poll::Ready(Ok(()));
}
Poll::Ready(Some(Err(e))) => {
error!("Connection to server closed.");
session.shutdown();
session.shutdown("connection error");
return Poll::Ready(Err(e));
}
Poll::Pending => break,
Expand Down Expand Up @@ -927,7 +927,7 @@ where
match this.keep_alive_state {
ExpectingPing | ExpectingPongAck => {
if !session.is_invalid() {
session.shutdown();
session.shutdown("keep-alive timeout");
}
// TODO: Optionally reconnect (with cached/last credentials?)
return Poll::Ready(Err(io::Error::new(
Expand Down
14 changes: 12 additions & 2 deletions examples/play_connect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::{Arc, atomic::AtomicBool};

use librespot::{
connect::{ConnectConfig, LoadRequest, LoadRequestOptions, Spirc},
core::{
Expand Down Expand Up @@ -59,8 +61,16 @@ async fn main() -> Result<(), Error> {
move || sink_builder(None, audio_format),
);

let (spirc, spirc_task) =
Spirc::new(connect_config, session.clone(), credentials, player, mixer).await?;
let was_playing = Arc::new(AtomicBool::new(false));
let (spirc, spirc_task) = Spirc::new(
connect_config,
session.clone(),
credentials,
player,
mixer,
was_playing,
)
.await?;

// these calls can be seen as "queued"
spirc.activate()?;
Expand Down
24 changes: 14 additions & 10 deletions playback/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::{
mem,
pin::Pin,
process::exit,
sync::Mutex,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
sync::{Mutex, RwLock},
task::{Context, Poll},
thread,
time::{Duration, Instant},
Expand Down Expand Up @@ -69,7 +69,7 @@ pub enum SinkStatus {
pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;

struct PlayerInternal {
session: Session,
session: Arc<RwLock<Session>>,
config: PlayerConfig,
commands: mpsc::UnboundedReceiver<PlayerCommand>,
load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
Expand Down Expand Up @@ -504,7 +504,7 @@ impl Player {
create_local_file_lookup(config.local_file_directories.as_slice());

let internal = PlayerInternal {
session,
session: Arc::new(RwLock::new(session)),
config,
commands: cmd_rx,
load_handles: Arc::new(Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -932,7 +932,7 @@ impl PlayerState {
}

struct PlayerTrackLoader {
session: Session,
session: Arc<RwLock<Session>>,
config: PlayerConfig,
local_file_lookup: Arc<LocalFileLookup>,
}
Expand All @@ -947,9 +947,10 @@ impl PlayerTrackLoader {
} else if let Some(alternatives) = audio_item.alternatives {
let Tracks(alternatives_vec) = alternatives; // required to make `into_iter` able to move

let session = self.session.read().unwrap().clone();
let alternatives: FuturesUnordered<_> = alternatives_vec
.into_iter()
.map(|alt_id| AudioItem::get_file(&self.session, alt_id))
.map(|alt_id| AudioItem::get_file(&session, alt_id))
.collect();

alternatives
Expand Down Expand Up @@ -1019,7 +1020,8 @@ impl PlayerTrackLoader {
}
};

let audio_item = match AudioItem::get_file(&self.session, track_uri).await {
let session = self.session.read().unwrap().clone();
let audio_item = match AudioItem::get_file(&session, track_uri).await {
Ok(audio) => match self.find_available_alternative(audio).await {
Some(audio) => audio,
None => {
Expand Down Expand Up @@ -1091,7 +1093,8 @@ impl PlayerTrackLoader {
// This is only a loop to be able to reload the file if an error occurred
// while opening a cached file.
loop {
let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second);
let session = self.session.read().unwrap().clone();
let encrypted_file = AudioFile::open(&session, file_id, bytes_per_second);

let encrypted_file = match encrypted_file.await {
Ok(encrypted_file) => encrypted_file,
Expand All @@ -1108,7 +1111,8 @@ impl PlayerTrackLoader {
// Not all audio files are encrypted. If we can't get a key, try loading the track
// without decryption. If the file was encrypted after all, the decoder will fail
// parsing and bail out, so we should be safe from outputting ear-piercing noise.
let key = match self.session.audio_key().request(track_id, file_id).await {
let session = self.session.read().unwrap().clone();
let key = match session.audio_key().request(track_id, file_id).await {
Ok(key) => Some(key),
Err(e) => {
warn!("Unable to load key, continuing without decryption: {e}");
Expand Down Expand Up @@ -1176,7 +1180,7 @@ impl PlayerTrackLoader {
Err(e) if is_cached => {
warn!("Unable to read cached audio file: {e}. Trying to download it.");

match self.session.cache() {
match self.session.read().unwrap().clone().cache() {
Some(cache) => {
if cache.remove_file(file_id).is_err() {
error!("Error removing file from cache");
Expand Down Expand Up @@ -2316,7 +2320,7 @@ impl PlayerInternal {

PlayerCommand::Stop => self.handle_player_stop(),

PlayerCommand::SetSession(session) => self.session = session,
PlayerCommand::SetSession(session) => *self.session.write().unwrap() = session,

PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),

Expand Down
Loading