Skip to content

Commit d40e7ec

Browse files
fix: dealer websocket reconnect leaving spirc hung on stale channels
When the dealer websocket connection drops and reconnects internally, spirc's tokio::select! loop remains blocked on subscription streams (connection_id_update, connect_state_update, etc.) that will never receive new messages. The mpsc senders in the SubscriberMap are not cleaned up on reconnect, so spirc hangs indefinitely — requiring a manual process restart. A second failure mode occurs when the dealer cannot reconnect because get_url() (which resolves the dealer endpoint and fetches an auth token via the session) hangs forever on a dead session TCP connection, with no timeout. Root cause analysis ------------------- The dealer's run() loop (core/src/dealer/mod.rs) coordinates reconnecting: when the websocket drops, it calls get_url() to resolve a new dealer endpoint, then connect(). However: 1. The subscription channels (mpsc::UnboundedSender<Message>) stored in DealerShared::message_handlers survive reconnects. Spirc's .next() calls on the receiver side never return None because the senders are still alive in the map — they just never send again. 2. get_url() calls session.apresolver().resolve("dealer") and session.login5().auth_token(), both of which need the session's TCP connection. When that connection is dead ("Connection to server closed"), these calls hang forever with no timeout. Before fix — log evidence of hangs requiring manual restart ----------------------------------------------------------- Feb 17 01:12 — "Websocket peer does not respond." [63.5 hour gap — process completely unresponsive] Feb 19 16:44 — Manual restart: "librespot 0.8.0 ..." Feb 23 08:41 — "Websocket peer does not respond." [32.2 hour gap — process completely unresponsive] Feb 24 16:51 — Manual restart: "librespot 0.8.0 ..." Dec 15 20:53-21:07 — Rapid reconnect storm: 12 "peer does not respond" in 50 minutes, with "starting dealer failed: Websocket couldn't be started because: Handshake not finished" errors. Feb 22 — Session TCP died at 05:55, spirc didn't notice for 7+ hours (no dealer reconnect signal), finally shut down at 22:11. Fix --- Add a watch::Sender<u64> generation counter shared between the dealer and its consumers. The dealer increments it when: - It successfully reconnects after a connection loss - get_url() times out (30s RECONNECT_URL_TIMEOUT) - get_url() returns an error Spirc subscribes to a watch::Receiver before dealer.start() to avoid a lost-wakeup race (watch retains state, unlike Notify which loses notifications if no one is awaiting). In its select! loop, spirc watches for changes and breaks out, triggering the existing "Spirc shut down unexpectedly" -> auto-reconnect path in main.rs. The get_url() error handling also fixes a pre-existing issue where get_url() failures would propagate via ? and terminate the dealer background task entirely, rather than retrying. Changes: - core/src/dealer/mod.rs: Add watch channel plumbing to Dealer, Builder, create_dealer! macro, and run(). Add 30s timeout on get_url(). Handle get_url() errors with retry+signal instead of fatal ? propagation. Signal consumers on reconnect. - core/src/dealer/manager.rs: Store watch::Sender in DealerManagerInner, pass to Builder::launch(), expose reconnect_receiver() for consumers. - connect/src/spirc.rs: Subscribe to reconnect watch before dealer.start(). Add select! branch to break on dealer reconnect. After fix — 9 days of logs showing automatic recovery ----------------------------------------------------- Websocket failures now recover in 2-7 seconds automatically: Mar 01 15:45 — "Websocket connection failed: Connection reset" Mar 01 15:45 — "Dealer reconnected; notifying consumers." Mar 01 15:45 — "Dealer reconnected; restarting spirc to refresh subscriptions." Mar 01 15:46 — "Spirc shut down unexpectedly" Mar 01 15:46 — "active device is <> with session <...>" [7s recovery] Mar 03 10:21 — "Websocket peer does not respond." Mar 03 10:21 — "Dealer reconnected; notifying consumers." Mar 03 10:21 — "restarting spirc to refresh subscriptions." Mar 03 10:21 — "active device is <> with session <...>" [7s recovery] Mar 06 09:42 — "Websocket peer does not respond." Mar 06 09:42 — "Error while connecting: Network is unreachable" Mar 06 09:43 — [retries for ~1 min while network recovers] Mar 06 09:43 — "Dealer reconnected; notifying consumers." Mar 06 09:43 — "active device is <> with session <...>" [91s recovery] Summary over 9 days post-fix (Feb 28 - Mar 8): - 0 manual restarts needed (vs 2 in 7 days before fix) - 9 dealer reconnect events, all recovered in 2-91 seconds - 14 session TCP closures also recovered (via existing path) - 0 get_url() timeouts fired (websocket errors caught first) - Process running continuously for 9+ days Co-authored-by: Copilot <[email protected]>
1 parent 33bf3a7 commit d40e7ec

3 files changed

Lines changed: 71 additions & 11 deletions

File tree

connect/src/spirc.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ impl SpircTask {
458458
};
459459
}
460460

461+
// Subscribe before start() so we can't miss a reconnect notification.
462+
let mut reconnect_rx = self.session.dealer().reconnect_receiver();
463+
461464
if let Err(why) = self.session.dealer().start().await {
462465
error!("starting dealer failed: {why}");
463466
return;
@@ -585,6 +588,12 @@ impl SpircTask {
585588
}
586589
}
587590
},
591+
// dealer reconnected after a connection loss — our subscription
592+
// streams are stale, so break out and let main.rs re-create spirc
593+
Ok(()) = reconnect_rx.changed() => {
594+
warn!("Dealer reconnected; restarting spirc to refresh subscriptions.");
595+
break;
596+
},
588597
else => break
589598
}
590599
}

core/src/dealer/manager.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use futures_core::Stream;
22
use futures_util::StreamExt;
33
use std::{pin::Pin, str::FromStr, sync::OnceLock};
44
use thiserror::Error;
5-
use tokio::sync::mpsc;
5+
use tokio::sync::{mpsc, watch};
66
use tokio_stream::wrappers::UnboundedReceiverStream;
77
use url::Url;
88

@@ -16,6 +16,7 @@ component! {
1616
DealerManager: DealerManagerInner {
1717
builder: OnceLock<Builder> = OnceLock::from(Builder::new()),
1818
dealer: OnceLock<Dealer> = OnceLock::new(),
19+
reconnect_tx: watch::Sender<u64> = watch::Sender::new(0),
1920
}
2021
}
2122

@@ -153,10 +154,12 @@ impl DealerManager {
153154
// and the token is expired we will just get 401 error
154155
let get_url = move || Self::get_url(session.clone());
155156

157+
let reconnect_tx = self.lock(|inner| inner.reconnect_tx.clone());
158+
156159
let dealer = self
157160
.lock(move |inner| inner.builder.take())
158161
.ok_or(DealerError::BuilderNotAvailable)?
159-
.launch(get_url, None)
162+
.launch(get_url, None, reconnect_tx)
160163
.await
161164
.map_err(DealerError::LaunchFailure)?;
162165

@@ -171,4 +174,8 @@ impl DealerManager {
171174
dealer.close().await
172175
}
173176
}
177+
178+
pub fn reconnect_receiver(&self) -> watch::Receiver<u64> {
179+
self.lock(|inner| inner.reconnect_tx.subscribe())
180+
}
174181
}

core/src/dealer/mod.rs

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use tokio::{
2121
sync::{
2222
Semaphore,
2323
mpsc::{self, UnboundedReceiver},
24+
watch,
2425
},
2526
task::JoinHandle,
2627
};
@@ -55,6 +56,7 @@ const PING_INTERVAL: Duration = Duration::from_secs(30);
5556
const PING_TIMEOUT: Duration = Duration::from_secs(3);
5657

5758
const RECONNECT_INTERVAL: Duration = Duration::from_secs(10);
59+
const RECONNECT_URL_TIMEOUT: Duration = Duration::from_secs(30);
5860

5961
const DEALER_REQUEST_HANDLERS_POISON_MSG: &str =
6062
"dealer request handlers mutex should not be poisoned";
@@ -261,7 +263,7 @@ struct Builder {
261263
}
262264

263265
macro_rules! create_dealer {
264-
($builder:expr, $shared:ident -> $body:expr) => {
266+
($builder:expr, $reconnect_tx:expr, $shared:ident -> $body:expr) => {
265267
match $builder {
266268
builder => {
267269
let shared = Arc::new(DealerShared {
@@ -270,6 +272,8 @@ macro_rules! create_dealer {
270272
notify_drop: Semaphore::new(0),
271273
});
272274

275+
let reconnect_tx: watch::Sender<u64> = $reconnect_tx;
276+
273277
let handle = {
274278
let $shared = Arc::clone(&shared);
275279
tokio::spawn($body)
@@ -278,6 +282,7 @@ macro_rules! create_dealer {
278282
Dealer {
279283
shared,
280284
handle: TimeoutOnDrop::new(handle, WEBSOCKET_CLOSE_TIMEOUT),
285+
reconnect_tx,
281286
}
282287
}
283288
}
@@ -301,26 +306,28 @@ impl Builder {
301306
handles(&self.request_handlers, &self.message_handlers, uri)
302307
}
303308

304-
pub fn launch_in_background<Fut, F>(self, get_url: F, proxy: Option<Url>) -> Dealer
309+
pub fn launch_in_background<Fut, F>(self, get_url: F, proxy: Option<Url>, reconnect_tx: watch::Sender<u64>) -> Dealer
305310
where
306311
Fut: Future<Output = GetUrlResult> + Send + 'static,
307312
F: (Fn() -> Fut) + Send + 'static,
308313
{
309-
create_dealer!(self, shared -> run(shared, None, get_url, proxy))
314+
let tx = reconnect_tx.clone();
315+
create_dealer!(self, reconnect_tx, shared -> run(shared, None, get_url, proxy, tx))
310316
}
311317

312-
pub async fn launch<Fut, F>(self, get_url: F, proxy: Option<Url>) -> WsResult<Dealer>
318+
pub async fn launch<Fut, F>(self, get_url: F, proxy: Option<Url>, reconnect_tx: watch::Sender<u64>) -> WsResult<Dealer>
313319
where
314320
Fut: Future<Output = GetUrlResult> + Send + 'static,
315321
F: (Fn() -> Fut) + Send + 'static,
316322
{
317-
let dealer = create_dealer!(self, shared -> {
323+
let tx = reconnect_tx.clone();
324+
let dealer = create_dealer!(self, reconnect_tx, shared -> {
318325
// Try to connect.
319326
let url = get_url().await?;
320327
let tasks = connect(&url, proxy.as_ref(), &shared).await?;
321328

322329
// If a connection is established, continue in a background task.
323-
run(shared, Some(tasks), get_url, proxy)
330+
run(shared, Some(tasks), get_url, proxy, tx)
324331
});
325332

326333
Ok(dealer)
@@ -426,6 +433,7 @@ impl DealerShared {
426433
struct Dealer {
427434
shared: Arc<DealerShared>,
428435
handle: TimeoutOnDrop<Result<(), Error>>,
436+
reconnect_tx: watch::Sender<u64>,
429437
}
430438

431439
impl Dealer {
@@ -482,6 +490,10 @@ impl Dealer {
482490
)
483491
}
484492

493+
pub fn reconnect_receiver(&self) -> watch::Receiver<u64> {
494+
self.reconnect_tx.subscribe()
495+
}
496+
485497
pub async fn close(mut self) {
486498
debug!("closing dealer");
487499

@@ -665,19 +677,24 @@ async fn run<F, Fut>(
665677
initial_tasks: Option<(JoinHandle<()>, JoinHandle<()>)>,
666678
mut get_url: F,
667679
proxy: Option<Url>,
680+
reconnect_tx: watch::Sender<u64>,
668681
) -> Result<(), Error>
669682
where
670683
Fut: Future<Output = GetUrlResult> + Send + 'static,
671684
F: (FnMut() -> Fut) + Send + 'static,
672685
{
673686
let init_task = |t| Some(TimeoutOnDrop::new(t, WEBSOCKET_CLOSE_TIMEOUT));
674687

688+
let has_had_initial_connection = initial_tasks.is_some();
689+
675690
let mut tasks = if let Some((s, r)) = initial_tasks {
676691
(init_task(s), init_task(r))
677692
} else {
678693
(None, None)
679694
};
680695

696+
let mut has_connected = has_had_initial_connection;
697+
681698
while !shared.is_closed() {
682699
match &mut tasks {
683700
(Some(t0), Some(t1)) => {
@@ -702,11 +719,38 @@ where
702719
() = shared.closed() => {
703720
break
704721
},
705-
e = get_url() => e
706-
}?;
722+
result = tokio::time::timeout(RECONNECT_URL_TIMEOUT, get_url()) => {
723+
match result {
724+
Ok(Ok(url)) => url,
725+
Ok(Err(e)) => {
726+
error!("Failed to resolve dealer URL: {e}");
727+
if has_connected {
728+
reconnect_tx.send_modify(|n| *n += 1);
729+
}
730+
tokio::time::sleep(RECONNECT_INTERVAL).await;
731+
continue;
732+
}
733+
Err(_) => {
734+
error!("Timed out resolving dealer URL.");
735+
if has_connected {
736+
reconnect_tx.send_modify(|n| *n += 1);
737+
}
738+
tokio::time::sleep(RECONNECT_INTERVAL).await;
739+
continue;
740+
}
741+
}
742+
}
743+
};
707744

708745
match connect(&url, proxy.as_ref(), &shared).await {
709-
Ok((s, r)) => tasks = (init_task(s), init_task(r)),
746+
Ok((s, r)) => {
747+
tasks = (init_task(s), init_task(r));
748+
if has_connected {
749+
warn!("Dealer reconnected; notifying consumers.");
750+
reconnect_tx.send_modify(|n| *n += 1);
751+
}
752+
has_connected = true;
753+
}
710754
Err(e) => {
711755
error!("Error while connecting: {e}");
712756
tokio::time::sleep(RECONNECT_INTERVAL).await;

0 commit comments

Comments
 (0)