Skip to content

Commit 9170a1b

Browse files
photovoltexralphptorres
authored andcommitted
feat: add forwarding channel of the remote and our PlayerState
1 parent 33bf3a7 commit 9170a1b

1 file changed

Lines changed: 39 additions & 4 deletions

File tree

connect/src/spirc.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::{
2525
social_connect_v2::SessionUpdate,
2626
transfer_state::TransferState,
2727
user_attributes::UserAttributesMutation,
28+
{context_page::ContextPage, player::PlayerState},
2829
},
2930
state::{
3031
context::{ContextType, ResetContext},
@@ -33,7 +34,6 @@ use crate::{
3334
},
3435
};
3536
use futures_util::StreamExt;
36-
use librespot_protocol::context_page::ContextPage;
3737
use protobuf::MessageField;
3838
use std::{
3939
future::Future,
@@ -42,7 +42,10 @@ use std::{
4242
time::{Duration, SystemTime, UNIX_EPOCH},
4343
};
4444
use thiserror::Error;
45-
use tokio::{sync::mpsc, time::sleep};
45+
use tokio::{
46+
sync::{broadcast, mpsc},
47+
time::sleep,
48+
};
4649

4750
#[derive(Debug, Error)]
4851
enum SpircError {
@@ -111,6 +114,8 @@ struct SpircTask {
111114
/// when no other future resolves, otherwise resets the delay
112115
update_state: bool,
113116

117+
state_sender: broadcast::Sender<PlayerState>,
118+
114119
spirc_id: usize,
115120
}
116121

@@ -148,6 +153,7 @@ const UPDATE_STATE_DELAY: Duration = Duration::from_millis(200);
148153
/// The spotify connect handle
149154
pub struct Spirc {
150155
commands: mpsc::UnboundedSender<SpircCommand>,
156+
state_sender: broadcast::Sender<PlayerState>,
151157
}
152158

153159
impl Spirc {
@@ -225,6 +231,7 @@ impl Spirc {
225231
let _ = session.login5().auth_token().await?;
226232

227233
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
234+
let (state_tx, _) = broadcast::channel(1);
228235

229236
let player_events = player.get_player_event_channel();
230237

@@ -261,10 +268,15 @@ impl Spirc {
261268
update_volume: false,
262269
update_state: false,
263270

271+
state_sender: state_tx.clone(),
272+
264273
spirc_id,
265274
};
266275

267-
let spirc = Spirc { commands: cmd_tx };
276+
let spirc = Spirc {
277+
commands: cmd_tx,
278+
state_sender: state_tx,
279+
};
268280

269281
let initial_volume = task.connect_state.device_info().volume;
270282
task.connect_state.set_volume(0);
@@ -435,6 +447,14 @@ impl Spirc {
435447
.commands
436448
.send(SpircCommand::Transfer(transfer_request))?)
437449
}
450+
451+
/// Get a channel which sends the [PlayerState] whenever it changes.
452+
///
453+
/// Forwards the internal [PlayerState] when we are the active device. When we are only
454+
/// a spectator, forwards any [PlayerState] update from the active player.
455+
pub fn get_state_update_channel(&self) -> broadcast::Receiver<PlayerState> {
456+
self.state_sender.subscribe()
457+
}
438458
}
439459

440460
impl SpircTask {
@@ -983,6 +1003,17 @@ impl SpircTask {
9831003
}
9841004
}
9851005

1006+
fn emit_state_update(&self, state: Option<PlayerState>) {
1007+
if self.state_sender.receiver_count() == 0 {
1008+
return;
1009+
}
1010+
1011+
let state = state.unwrap_or_else(|| self.connect_state.player().clone());
1012+
if let Err(why) = self.state_sender.send(state) {
1013+
warn!("couldn't emit state because: {why}")
1014+
}
1015+
}
1016+
9861017
async fn handle_cluster_update(
9871018
&mut self,
9881019
mut cluster_update: ClusterUpdate,
@@ -995,7 +1026,7 @@ impl SpircTask {
9951026
cluster_update.cluster.active_device_id
9961027
);
9971028

998-
if let Some(cluster) = cluster_update.cluster.take() {
1029+
if let Some(mut cluster) = cluster_update.cluster.take() {
9991030
let became_inactive = self.connect_state.is_active()
10001031
&& cluster.active_device_id != self.session.device_id();
10011032
if became_inactive {
@@ -1007,6 +1038,8 @@ impl SpircTask {
10071038
// background: when another device sends a connect-state update, some player's position de-syncs
10081039
// tried: providing session_id, playback_id, track-metadata "track_player"
10091040
self.update_state = true;
1041+
} else if let Some(state) = cluster.player_state.take() {
1042+
self.emit_state_update(Some(state))
10101043
}
10111044
} else if self.connect_state.is_active() {
10121045
self.connect_state.became_inactive(&self.session).await?;
@@ -1903,6 +1936,8 @@ impl SpircTask {
19031936

19041937
self.connect_state.set_now(self.now_ms() as u64);
19051938

1939+
self.emit_state_update(None);
1940+
19061941
self.connect_state
19071942
.send_state(&self.session)
19081943
.await

0 commit comments

Comments
 (0)