diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index fec6057c4..85ad6a072 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -17,7 +17,10 @@ use crate::{ player::{Player, PlayerEvent, PlayerEventChannel, QueueTrack}, }, protocol::{ - connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand}, + connect::{ + Cluster, ClusterUpdate, ClusterUpdateReason as ServerClusterUpdateReason, + LogoutCommand, SetVolumeCommand, + }, context::Context, explicit_content_pubsub::UserAttributesUpdate, player::ProvidedTrack, @@ -25,6 +28,7 @@ use crate::{ social_connect_v2::SessionUpdate, transfer_state::TransferState, user_attributes::UserAttributesMutation, + {context_page::ContextPage, player::PlayerState}, }, state::{ context::{ContextType, ResetContext}, @@ -33,16 +37,19 @@ use crate::{ }, }; use futures_util::StreamExt; -use librespot_protocol::context_page::ContextPage; use protobuf::MessageField; use std::{ + collections::HashMap, future::Future, sync::Arc, sync::atomic::{AtomicUsize, Ordering}, time::{Duration, SystemTime, UNIX_EPOCH}, }; use thiserror::Error; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{ + sync::{broadcast, mpsc, watch}, + time::sleep, +}; #[derive(Debug, Error)] enum SpircError { @@ -69,6 +76,103 @@ impl From for Error { } } +/// Information about a device in the cluster +#[derive(Debug, Clone)] +pub struct DeviceInfo { + /// Unique device identifier + pub device_id: String, + /// Human-readable device name + pub device_alias: String, + /// Device type (e.g., "Speaker", "Phone") + pub device_type: String, + /// Volume level 0-100 + pub volume: u32, + /// Whether this is the currently active device + pub is_active: bool, +} + +/// Current state of the device cluster (all known devices) +#[derive(Debug, Clone)] +pub struct ClusterState { + /// Map of all known devices by device_id + pub devices: HashMap, + /// Currently active device ID (if any) + pub active_device_id: Option, +} + +/// Queue information (previous and next tracks) +#[derive(Debug, Clone)] +pub struct QueueList { + /// Previous tracks in the queue (as URIs) + pub prev_tracks: Vec, + /// Next tracks in the queue (as URIs) + pub next_tracks: Vec, +} + +/// Semantic reason for cluster updates +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ClusterUpdateReason { + /// Device list changed + DeviceListChanged, + /// Active device switched + ActiveDeviceChanged, + /// Device state changed + DeviceStateChanged, + /// Device info changed + DeviceInfoChanged, +} + +/// Event emitted when cluster state changes +#[derive(Debug, Clone)] +pub struct ClusterUpdateEvent { + /// Device ID that changed + pub device_id: String, + /// Reason for the update + pub reason: ClusterUpdateReason, +} + +/// Semantic reasons for queue updates +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum QueueUpdateReason { + /// Previous tracks changed + PrevTracksChanged, + /// Next tracks changed + NextTracksChanged, +} + +/// Event emitted when queue changes +#[derive(Debug, Clone)] +pub struct QueueUpdateEvent { + /// Reason for the queue update + pub reason: QueueUpdateReason, +} + +/// Semantic reasons for player state updates +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PlayerUpdateReason { + /// Track changed + TrackChanged, + /// Play/pause state changed + PlayPauseChanged, + /// Shuffle mode changed + ShuffleChanged, + /// Repeat mode changed + RepeatChanged, + /// Context changed + ContextChanged, + /// Seek detected + SeekChanged, + /// Other state change + Other, +} + +/// Emitted when player state changes +#[derive(Debug, Clone)] +pub struct PlayerUpdateEvent { + /// Reason for the player update + pub reason: PlayerUpdateReason, +} + struct SpircTask { player: Arc, mixer: Arc, @@ -111,6 +215,15 @@ struct SpircTask { /// when no other future resolves, otherwise resets the delay update_state: bool, + player_update_sender: broadcast::Sender, + cluster_update_sender: broadcast::Sender, + queue_update_sender: broadcast::Sender, + player_state_sender: watch::Sender>, + cluster_state_sender: watch::Sender, + queue_list_sender: watch::Sender, + last_active_device_id: Option, + last_player_state: Option, + spirc_id: usize, } @@ -148,6 +261,12 @@ const UPDATE_STATE_DELAY: Duration = Duration::from_millis(200); /// The spotify connect handle pub struct Spirc { commands: mpsc::UnboundedSender, + player_update_sender: broadcast::Sender, + cluster_update_sender: broadcast::Sender, + queue_update_sender: broadcast::Sender, + player_state_sender: watch::Sender>, + cluster_state_sender: watch::Sender, + queue_list_sender: watch::Sender, } impl Spirc { @@ -225,6 +344,18 @@ impl Spirc { let _ = session.login5().auth_token().await?; let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let (player_update_sender_tx, _) = broadcast::channel(1); + let (cluster_update_sender_tx, _) = broadcast::channel(1); + let (queue_update_sender_tx, _) = broadcast::channel(1); + let (player_state_sender_tx, _) = watch::channel(None); + let (cluster_state_sender_tx, _) = watch::channel(ClusterState { + devices: HashMap::new(), + active_device_id: None, + }); + let (queue_list_sender_tx, _) = watch::channel(QueueList { + prev_tracks: Vec::new(), + next_tracks: Vec::new(), + }); let player_events = player.get_player_event_channel(); @@ -261,10 +392,27 @@ impl Spirc { update_volume: false, update_state: false, + player_update_sender: player_update_sender_tx.clone(), + cluster_update_sender: cluster_update_sender_tx.clone(), + queue_update_sender: queue_update_sender_tx.clone(), + player_state_sender: player_state_sender_tx.clone(), + cluster_state_sender: cluster_state_sender_tx.clone(), + queue_list_sender: queue_list_sender_tx.clone(), + last_active_device_id: None, + last_player_state: None, + spirc_id, }; - let spirc = Spirc { commands: cmd_tx }; + let spirc = Spirc { + commands: cmd_tx, + player_update_sender: player_update_sender_tx, + cluster_update_sender: cluster_update_sender_tx, + queue_update_sender: queue_update_sender_tx, + player_state_sender: player_state_sender_tx, + cluster_state_sender: cluster_state_sender_tx, + queue_list_sender: queue_list_sender_tx, + }; let initial_volume = task.connect_state.device_info().volume; task.connect_state.set_volume(0); @@ -435,6 +583,36 @@ impl Spirc { .commands .send(SpircCommand::Transfer(transfer_request))?) } + + /// Get a channel which sends lightweight playback state updates. + pub fn get_player_update_channel(&self) -> broadcast::Receiver { + self.player_update_sender.subscribe() + } + + /// Get a channel which sends device topology changes (devices appearing/disappearing, active device changes). + pub fn get_cluster_update_channel(&self) -> broadcast::Receiver { + self.cluster_update_sender.subscribe() + } + + /// Get a channel which sends queue change events when prev/next tracks differ. + pub fn get_queue_update_channel(&self) -> broadcast::Receiver { + self.queue_update_sender.subscribe() + } + + /// Watch the current player state (full PlayerState) + pub fn watch_player_state(&self) -> watch::Receiver> { + self.player_state_sender.subscribe() + } + + /// Watch the current cluster state (all devices and active device) + pub fn watch_cluster_state(&self) -> watch::Receiver { + self.cluster_state_sender.subscribe() + } + + /// Watch the current queue list (previous and next tracks) + pub fn watch_queue_list(&self) -> watch::Receiver { + self.queue_list_sender.subscribe() + } } impl SpircTask { @@ -882,6 +1060,28 @@ impl SpircTask { } self.update_state = true; + + // Emit local player state to watch channels if this device is active + if self.connect_state.is_active() { + let player_state = self.connect_state.player().clone(); + let _ = self.player_state_sender.send(Some(player_state.clone())); + + // Also update queue list from local state + let queue_list = QueueList { + prev_tracks: player_state + .prev_tracks + .iter() + .map(|t| t.uri.clone()) + .collect(), + next_tracks: player_state + .next_tracks + .iter() + .map(|t| t.uri.clone()) + .collect(), + }; + let _ = self.queue_list_sender.send(queue_list); + } + Ok(()) } @@ -983,6 +1183,137 @@ impl SpircTask { } } + fn emit_player_update(&self, state: Option, last_state: Option<&PlayerState>) { + if self.player_update_sender.receiver_count() == 0 + && self.player_state_sender.receiver_count() == 0 + { + return; + } + + let state = state.unwrap_or_else(|| self.connect_state.player().clone()); + + // Determine the reason for the update by diffing with last state + let reason = if let Some(last) = last_state { + // Check in priority order: track > play/pause > shuffle > repeat > seek + let new_track_uri = state + .track + .as_ref() + .map(|t| t.uri.clone()) + .unwrap_or_default(); + let old_track_uri = last + .track + .as_ref() + .map(|t| t.uri.clone()) + .unwrap_or_default(); + if new_track_uri != old_track_uri { + let _ = self.player_state_sender.send(Some(state)); + let _ = self.player_update_sender.send(PlayerUpdateEvent { + reason: PlayerUpdateReason::TrackChanged, + }); + return; + } + + let new_is_playing = state.is_playing && !state.is_paused; + let old_is_playing = last.is_playing && !last.is_paused; + if new_is_playing != old_is_playing { + let _ = self.player_state_sender.send(Some(state)); + let _ = self.player_update_sender.send(PlayerUpdateEvent { + reason: PlayerUpdateReason::PlayPauseChanged, + }); + return; + } + + let new_shuffle = state + .options + .as_ref() + .map(|o| o.shuffling_context) + .unwrap_or(false); + let old_shuffle = last + .options + .as_ref() + .map(|o| o.shuffling_context) + .unwrap_or(false); + if new_shuffle != old_shuffle { + let _ = self.player_state_sender.send(Some(state)); + let _ = self.player_update_sender.send(PlayerUpdateEvent { + reason: PlayerUpdateReason::ShuffleChanged, + }); + return; + } + + let new_repeat = state + .options + .as_ref() + .map(|o| (o.repeating_context, o.repeating_track)); + let old_repeat = last + .options + .as_ref() + .map(|o| (o.repeating_context, o.repeating_track)); + if new_repeat != old_repeat { + let _ = self.player_state_sender.send(Some(state)); + let _ = self.player_update_sender.send(PlayerUpdateEvent { + reason: PlayerUpdateReason::RepeatChanged, + }); + return; + } + + if state.context_uri != last.context_uri { + let _ = self.player_state_sender.send(Some(state)); + let _ = self.player_update_sender.send(PlayerUpdateEvent { + reason: PlayerUpdateReason::ContextChanged, + }); + return; + } + + // Detect seek: position jump (not just natural progress) + let time_diff = state.timestamp.saturating_sub(last.timestamp); + let expected_position = if state.is_playing { + // Account for natural progression if playing + last.position_as_of_timestamp + time_diff + } else { + // If paused, position shouldn't change + last.position_as_of_timestamp + }; + + let position_delta = + (state.position_as_of_timestamp - expected_position).abs(); + if position_delta > 5000 { + PlayerUpdateReason::SeekChanged + } else { + // No significant change detected + PlayerUpdateReason::Other + } + } else { + // No previous state (initial state) + PlayerUpdateReason::Other + }; + + let _ = self.player_state_sender.send(Some(state)); + if let Err(why) = self.player_update_sender.send(PlayerUpdateEvent { reason }) { + warn!("couldn't emit player update because: {why}") + } + } + + fn emit_cluster_update(&self, event: ClusterUpdateEvent) { + if self.cluster_update_sender.receiver_count() == 0 { + return; + } + + if let Err(why) = self.cluster_update_sender.send(event) { + warn!("couldn't emit cluster transition because: {why}") + } + } + + fn emit_queue_update(&self, event: QueueUpdateEvent) { + if self.queue_update_sender.receiver_count() == 0 { + return; + } + + if let Err(why) = self.queue_update_sender.send(event) { + warn!("couldn't emit queue update because: {why}") + } + } + async fn handle_cluster_update( &mut self, mut cluster_update: ClusterUpdate, @@ -995,7 +1326,79 @@ impl SpircTask { cluster_update.cluster.active_device_id ); - if let Some(cluster) = cluster_update.cluster.take() { + if let Some(mut cluster) = cluster_update.cluster.take() { + if let Ok(reason_enum) = reason { + match reason_enum { + ServerClusterUpdateReason::DEVICE_NEW_CONNECTION + | ServerClusterUpdateReason::NEW_DEVICE_APPEARED + | ServerClusterUpdateReason::DEVICES_DISAPPEARED => { + for device_id in &cluster_update.devices_that_changed { + self.emit_cluster_update(ClusterUpdateEvent { + reason: crate::spirc::ClusterUpdateReason::DeviceListChanged, + device_id: device_id.clone(), + }); + } + } + ServerClusterUpdateReason::DEVICE_ALIAS_CHANGED + | ServerClusterUpdateReason::DEVICE_VOLUME_CHANGED => { + for device_id in &cluster_update.devices_that_changed { + self.emit_cluster_update(ClusterUpdateEvent { + reason: crate::spirc::ClusterUpdateReason::DeviceInfoChanged, + device_id: device_id.clone(), + }); + } + } + ServerClusterUpdateReason::DEVICE_STATE_CHANGED => { + for device_id in &cluster_update.devices_that_changed { + self.emit_cluster_update(ClusterUpdateEvent { + reason: crate::spirc::ClusterUpdateReason::DeviceStateChanged, + device_id: device_id.clone(), + }); + } + } + _ => {} + } + } + + // Check for active device changes + let new_active_device_id = if cluster.active_device_id.is_empty() { + None + } else { + Some(cluster.active_device_id.clone()) + }; + if new_active_device_id != self.last_active_device_id { + self.emit_cluster_update(ClusterUpdateEvent { + reason: crate::spirc::ClusterUpdateReason::ActiveDeviceChanged, + device_id: new_active_device_id.clone().unwrap_or_default(), + }); + self.last_active_device_id = new_active_device_id; + } + + // Sync device state to cluster_state_sender (after emitting events) + let devices: HashMap = cluster + .device + .values() + .map(|device| { + let info = DeviceInfo { + device_id: device.device_id.clone(), + device_alias: device.name.clone(), + device_type: format!("{:?}", device.device_type), + volume: device.volume, + is_active: device.device_id == cluster.active_device_id, + }; + (info.device_id.clone(), info) + }) + .collect(); + + let _ = self.cluster_state_sender.send(ClusterState { + devices, + active_device_id: if cluster.active_device_id.is_empty() { + None + } else { + Some(cluster.active_device_id.clone()) + }, + }); + let became_inactive = self.connect_state.is_active() && cluster.active_device_id != self.session.device_id(); if became_inactive { @@ -1007,6 +1410,37 @@ impl SpircTask { // background: when another device sends a connect-state update, some player's position de-syncs // tried: providing session_id, playback_id, track-metadata "track_player" self.update_state = true; + } else if let Some(state) = cluster.player_state.take() { + if let Some(last_state) = &self.last_player_state { + let prev_changed = state.prev_tracks != last_state.prev_tracks; + let next_changed = state.next_tracks != last_state.next_tracks; + if prev_changed || next_changed { + let queue_list = QueueList { + prev_tracks: state.prev_tracks.iter().map(|t| t.uri.clone()).collect(), + next_tracks: state.next_tracks.iter().map(|t| t.uri.clone()).collect(), + }; + let _ = self.queue_list_sender.send(queue_list); + + if prev_changed { + self.emit_queue_update(QueueUpdateEvent { + reason: QueueUpdateReason::PrevTracksChanged, + }); + } + if next_changed { + self.emit_queue_update(QueueUpdateEvent { + reason: QueueUpdateReason::NextTracksChanged, + }); + } + } + } else { + let queue_list = QueueList { + prev_tracks: state.prev_tracks.iter().map(|t| t.uri.clone()).collect(), + next_tracks: state.next_tracks.iter().map(|t| t.uri.clone()).collect(), + }; + let _ = self.queue_list_sender.send(queue_list); + } + self.emit_player_update(Some(state.clone()), self.last_player_state.as_ref()); + self.last_player_state = Some(state); } } else if self.connect_state.is_active() { self.connect_state.became_inactive(&self.session).await?;