From c8c794e8ba547c89f022a4cfa80e645b062d5384 Mon Sep 17 00:00:00 2001 From: "ggiraudon@hotmail.com" Date: Sat, 4 Oct 2025 20:17:09 +0000 Subject: [PATCH 1/4] Add pipewire audio backend --- Cargo.lock | 184 +++++++++++++- Cargo.toml | 9 + playback/Cargo.toml | 3 + playback/src/audio_backend/mod.rs | 7 + playback/src/audio_backend/pipewire.rs | 333 +++++++++++++++++++++++++ 5 files changed, 534 insertions(+), 2 deletions(-) create mode 100644 playback/src/audio_backend/pipewire.rs diff --git a/Cargo.lock b/Cargo.lock index 0ad277732..23c07e554 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,16 @@ dependencies = [ "libc", ] +[[package]] +name = "annotate-snippets" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "710e8eae58854cdc1790fcb56cca04d712a17be849eeb81da2a724bf4bae2bc4" +dependencies = [ + "anstyle", + "unicode-width", +] + [[package]] name = "anstream" version = "0.6.20" @@ -227,6 +237,25 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "annotate-snippets", + "bitflags 2.9.4", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -288,6 +317,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "cfg-expr" version = "0.20.3" @@ -334,6 +372,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.9", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -365,6 +414,21 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" + [[package]] name = "core-foundation" version = "0.9.4" @@ -992,6 +1056,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "gobject-sys" version = "0.21.2" @@ -1034,7 +1104,7 @@ dependencies = [ "futures-util", "glib", "gstreamer-sys", - "itertools", + "itertools 0.14.0", "kstring", "libc", "muldiv", @@ -1637,6 +1707,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1674,7 +1753,7 @@ dependencies = [ "bitflags 1.3.2", "lazy_static", "libc", - "libloading", + "libloading 0.7.4", "log", "pkg-config", ] @@ -1769,6 +1848,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.0", +] + [[package]] name = "libm" version = "0.2.15" @@ -1857,7 +1946,9 @@ dependencies = [ "librespot-oauth", "librespot-playback", "librespot-protocol", + "libspa", "log", + "pipewire", "sha1", "sysinfo", "thiserror 2.0.16", @@ -2034,8 +2125,10 @@ dependencies = [ "librespot-audio", "librespot-core", "librespot-metadata", + "libspa", "log", "ogg", + "pipewire", "portable-atomic", "portaudio-rs", "rand 0.9.2", @@ -2057,6 +2150,34 @@ dependencies = [ "protobuf-codegen", ] +[[package]] +name = "libspa" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b8cfa2a7656627b4c92c6b9ef929433acd673d5ab3708cda1b18478ac00df4" +dependencies = [ + "bitflags 2.9.4", + "cc", + "convert_case", + "cookie-factory", + "libc", + "libspa-sys", + "nix", + "nom 8.0.0", + "system-deps", +] + +[[package]] +name = "libspa-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "901049455d2eb6decf9058235d745237952f4804bc584c5fcb41412e6adcc6e0" +dependencies = [ + "bindgen", + "cc", + "system-deps", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -2127,6 +2248,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2218,6 +2345,25 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -2633,6 +2779,34 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pipewire" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9688b89abf11d756499f7c6190711d6dbe5a3acdb30c8fbf001d6596d06a8d44" +dependencies = [ + "anyhow", + "bitflags 2.9.4", + "libc", + "libspa", + "libspa-sys", + "nix", + "once_cell", + "pipewire-sys", + "thiserror 2.0.16", +] + +[[package]] +name = "pipewire-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb028afee0d6ca17020b090e3b8fa2d7de23305aef975c7e5192a5050246ea36" +dependencies = [ + "bindgen", + "libspa-sys", + "system-deps", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -4106,6 +4280,12 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 63a5927a7..d5183a4d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,6 +102,11 @@ alsa-backend = ["librespot-playback/alsa-backend"] # Integrates with the PulseAudio sound server for advanced audio routing. pulseaudio-backend = ["librespot-playback/pulseaudio-backend"] +# pipewire-backend: PipeWire backend (Linux only). +# Modern audio server that provides low-latency audio with advanced routing capabilities. +# Designed as a replacement for both PulseAudio and JACK. +pipewire-backend = ["librespot-playback/pipewire-backend", "dep:pipewire", "dep:libspa"] + # jackaudio-backend: JACK Audio Connection Kit backend. # Professional audio backend for low-latency, high-quality audio routing. jackaudio-backend = ["librespot-playback/jackaudio-backend"] @@ -183,6 +188,10 @@ tokio = { version = "1", features = [ ] } url = "2.2" +# Dependencies for pipewire +libspa = { version = "0.9", optional = true } +pipewire = { version = "0.9", optional = true } + [package.metadata.deb] maintainer = "Librespot Organization " copyright = "2015, Paul Liétar" diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 2001c680b..4368e9cb2 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -20,6 +20,7 @@ gstreamer-backend = [ "dep:gstreamer-audio", ] jackaudio-backend = ["dep:jack"] +pipewire-backend = ["dep:pipewire", "dep:libspa"] portaudio-backend = ["dep:portaudio-rs"] pulseaudio-backend = ["dep:libpulse-binding", "dep:libpulse-simple-binding"] rodio-backend = ["dep:cpal", "dep:rodio"] @@ -62,6 +63,8 @@ zerocopy = { version = "0.8", features = ["derive"] } # Backends alsa = { version = "0.10", optional = true } jack = { version = "0.13", optional = true } +libspa = { version = "0.9", optional = true } +pipewire = { version = "0.9", optional = true } portaudio-rs = { version = "0.3", optional = true } sdl2 = { version = "0.38", optional = true } diff --git a/playback/src/audio_backend/mod.rs b/playback/src/audio_backend/mod.rs index f8f43e3fa..73c4d86ea 100644 --- a/playback/src/audio_backend/mod.rs +++ b/playback/src/audio_backend/mod.rs @@ -95,6 +95,11 @@ mod pulseaudio; #[cfg(feature = "pulseaudio-backend")] use self::pulseaudio::PulseAudioSink; +#[cfg(feature = "pipewire-backend")] +mod pipewire; +#[cfg(feature = "pipewire-backend")] +use self::pipewire::PipeWireSink; + #[cfg(feature = "jackaudio-backend")] mod jackaudio; #[cfg(feature = "jackaudio-backend")] @@ -130,6 +135,8 @@ pub const BACKENDS: &[(&str, SinkBuilder)] = &[ (PortAudioSink::NAME, mk_sink::>), #[cfg(feature = "pulseaudio-backend")] (PulseAudioSink::NAME, mk_sink::), + #[cfg(feature = "pipewire-backend")] + (PipeWireSink::NAME, mk_sink::), #[cfg(feature = "jackaudio-backend")] (JackSink::NAME, mk_sink::), #[cfg(feature = "gstreamer-backend")] diff --git a/playback/src/audio_backend/pipewire.rs b/playback/src/audio_backend/pipewire.rs new file mode 100644 index 000000000..3c636a72a --- /dev/null +++ b/playback/src/audio_backend/pipewire.rs @@ -0,0 +1,333 @@ +use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; +use crate::config::AudioFormat; +use crate::convert::Converter; +use crate::decoder::AudioPacket; +use crate::{NUM_CHANNELS, SAMPLE_RATE}; +use std::sync::mpsc::{sync_channel, SyncSender, Receiver}; +use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::thread; +use thiserror::Error; + +use pipewire as pw; +use pw::{properties::properties, spa}; +use spa::pod::Pod; +use libspa::sys as spa_sys; + +#[derive(Debug, Error)] +enum PipeWireError { + #[error(" Failed to Create Main Loop: {0}")] + MainLoopCreation(String), + + #[error(" Failed to Create Stream: {0}")] + StreamCreation(String), + + #[error(" Failed to Connect Stream: {0}")] + StreamConnect(String), + + #[error(" Stream Not Connected")] + NotConnected, +} + +impl From for SinkError { + fn from(e: PipeWireError) -> SinkError { + use PipeWireError::*; + let es = e.to_string(); + match e { + MainLoopCreation(_) | StreamCreation(_) | StreamConnect(_) => { + SinkError::ConnectionRefused(es) + } + NotConnected => SinkError::NotConnected(es), + } + } +} + +pub struct PipeWireSink { + format: AudioFormat, + // Channel sender for audio data + sender: Option>, + // Flag to signal thread to stop + quit_flag: Arc, + initialized: bool, + _main_loop_handle: Option>, +} + +fn calculate_sample_size(format: AudioFormat) -> usize { + use AudioFormat::*; + match format { + F64 => 8, + F32 | S32 | S24 => 4, + S24_3 => 3, + S16 => 2, + } +} + +fn convert_audio_format(format: AudioFormat) -> spa::param::audio::AudioFormat { + use AudioFormat::*; + match format { + F64 => spa::param::audio::AudioFormat::F64LE, + F32 => spa::param::audio::AudioFormat::F32LE, + S32 => spa::param::audio::AudioFormat::S32LE, + S24 => spa::param::audio::AudioFormat::S24LE, + S24_3 => spa::param::audio::AudioFormat::S24_32LE, + S16 => spa::param::audio::AudioFormat::S16LE, + } +} + +impl Open for PipeWireSink { + fn open(_device: Option, format: AudioFormat) -> Self { + info!("Using PipeWireSink with format: {format:?}"); + + Self { + format, + sender: None, + quit_flag: Arc::new(AtomicBool::new(false)), + initialized: false, + _main_loop_handle: None, + } + } +} + +impl Sink for PipeWireSink { + fn start(&mut self) -> SinkResult<()> { + if self.initialized { + return Ok(()); + } + + info!("Starting PipeWire sink..."); + + let format = self.format; + let quit_flag = Arc::clone(&self.quit_flag); + + // Create a sync channel - buffer size for ~1 second of audio to prevent underruns + let sample_size = calculate_sample_size(format); + let buffer_size = SAMPLE_RATE as usize * NUM_CHANNELS as usize * sample_size; + let (sender, receiver) = sync_channel::(buffer_size); + + // Store the sender for write_bytes + self.sender = Some(sender); + + // Run PipeWire main loop in a separate thread with the receiver + let handle = thread::spawn(move || { + if let Err(e) = run_pipewire_loop(receiver, quit_flag, format) { + error!("PipeWire loop error: {}", e); + } + }); + + self._main_loop_handle = Some(handle); + self.initialized = true; + + // Give the thread a moment to initialize + thread::sleep(std::time::Duration::from_millis(100)); + + info!("PipeWire sink started successfully"); + Ok(()) + } + + fn stop(&mut self) -> SinkResult<()> { + if !self.initialized { + return Ok(()); + } + + info!("Stopping PipeWire sink..."); + + // Signal the thread to quit + self.quit_flag.store(true, Ordering::Relaxed); + + // Drop the sender to signal the receiver thread to exit + self.sender = None; + + // Wait for the thread to finish with a timeout + if let Some(handle) = self._main_loop_handle.take() { + // Give it a moment to exit gracefully + thread::sleep(std::time::Duration::from_millis(100)); + let _ = handle.join(); + } + + // Reset the quit flag for potential restart + self.quit_flag.store(false, Ordering::Relaxed); + self.initialized = false; + + info!("PipeWire sink stopped"); + Ok(()) + } + + sink_as_bytes!(); +} + +impl SinkAsBytes for PipeWireSink { + fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { + if !self.initialized { + return Err(PipeWireError::NotConnected.into()); + } + + if let Some(ref sender) = self.sender { + // Send data through the channel - use blocking send to ensure all data is queued + for &byte in data { + sender.send(byte).map_err(|_| PipeWireError::NotConnected)?; + } + Ok(()) + } else { + Err(PipeWireError::NotConnected.into()) + } + } +} + +impl Drop for PipeWireSink { + fn drop(&mut self) { + let _ = self.stop(); + } +} + +impl PipeWireSink { + pub const NAME: &'static str = "pipewire"; +} + +fn run_pipewire_loop( + receiver: Receiver, + quit_flag: Arc, + format: AudioFormat, +) -> Result<(), PipeWireError> { + // Initialize PipeWire + pw::init(); + + let mainloop = pw::main_loop::MainLoopRc::new(None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let context = pw::context::ContextRc::new(&mainloop, None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let core = context.connect_rc(None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let stream = pw::stream::StreamBox::new( + &core, + "librespot-playback", + properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_ROLE => "Music", + *pw::keys::MEDIA_CATEGORY => "Playback", + *pw::keys::AUDIO_CHANNELS => NUM_CHANNELS.to_string().as_str(), + *pw::keys::APP_NAME => "librespot", + }, + ) + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))?; + + let sample_size = calculate_sample_size(format); + let stride = sample_size * NUM_CHANNELS as usize; + + // Clone mainloop for use in the listener callback + let mainloop_quit = mainloop.clone(); + + // This is the key: the listener is based on the working pipewire_tone_test.rs example + let _listener = stream + .add_local_listener_with_user_data((receiver, quit_flag.clone())) + .process(move |stream, (receiver, quit_flag)| { + // Check if we should quit + if quit_flag.load(Ordering::Relaxed) { + mainloop_quit.quit(); + return; + } + + match stream.dequeue_buffer() { + None => { + // No buffer available, this is normal + } + Some(mut buffer) => { + let datas = buffer.datas_mut(); + let data = &mut datas[0]; + + let n_frames = if let Some(slice) = data.data() { + let n_frames = slice.len() / stride; + let total_bytes = n_frames * stride; + + // Read from channel - try non-blocking first, then blocking if needed + let mut bytes_read = 0; + + // First, try to read without blocking + for i in 0..total_bytes { + match receiver.try_recv() { + Ok(byte) => { + slice[i] = byte; + bytes_read += 1; + } + Err(_) => break, + } + } + + // If we didn't get enough data, block to get more + // This prevents underruns and stuttering + if bytes_read < total_bytes { + for i in bytes_read..total_bytes { + match receiver.recv() { + Ok(byte) => { + slice[i] = byte; + } + Err(_) => { + // Channel disconnected, fill rest with silence and signal quit + for j in i..total_bytes { + slice[j] = 0; + } + quit_flag.store(true, Ordering::Relaxed); + break; + } + } + } + } + + n_frames + } else { + 0 + }; + + // This matches the working pipewire_tone_test.rs example exactly + let chunk = data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as _; + *chunk.size_mut() = (stride * n_frames) as _; + } + } + }) + .register() + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))?; + + // Setup audio format parameters - matches pipewire_tone_test.rs + let mut audio_info = spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(convert_audio_format(format)); + audio_info.set_rate(SAMPLE_RATE); + audio_info.set_channels(NUM_CHANNELS as u32); + + let mut position = [0; spa::param::audio::MAX_CHANNELS]; + position[0] = spa_sys::SPA_AUDIO_CHANNEL_FL; + position[1] = spa_sys::SPA_AUDIO_CHANNEL_FR; + audio_info.set_position(position); + + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(pw::spa::pod::Object { + type_: spa_sys::SPA_TYPE_OBJECT_Format, + id: spa_sys::SPA_PARAM_EnumFormat, + properties: audio_info.into(), + }), + ) + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))? + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + // Connect stream - matches pipewire_tone_test.rs + stream.connect( + spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + ) + .map_err(|e| PipeWireError::StreamConnect(format!("{:?}", e)))?; + + // Run the main loop + mainloop.run(); + + Ok(()) +} From e7c63f98887dcd24c39ae1f1cbc79f8764504f25 Mon Sep 17 00:00:00 2001 From: Guillaume Giraudon Date: Tue, 7 Oct 2025 19:39:56 +0000 Subject: [PATCH 2/4] Fix dependencies --- Cargo.toml | 6 +----- playback/Cargo.toml | 7 +++++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5183a4d1..216cb25fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,7 +105,7 @@ pulseaudio-backend = ["librespot-playback/pulseaudio-backend"] # pipewire-backend: PipeWire backend (Linux only). # Modern audio server that provides low-latency audio with advanced routing capabilities. # Designed as a replacement for both PulseAudio and JACK. -pipewire-backend = ["librespot-playback/pipewire-backend", "dep:pipewire", "dep:libspa"] +pipewire-backend = ["librespot-playback/pipewire-backend"] # jackaudio-backend: JACK Audio Connection Kit backend. # Professional audio backend for low-latency, high-quality audio routing. @@ -188,10 +188,6 @@ tokio = { version = "1", features = [ ] } url = "2.2" -# Dependencies for pipewire -libspa = { version = "0.9", optional = true } -pipewire = { version = "0.9", optional = true } - [package.metadata.deb] maintainer = "Librespot Organization " copyright = "2015, Paul Liétar" diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 4368e9cb2..aff9bd76c 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -63,8 +63,6 @@ zerocopy = { version = "0.8", features = ["derive"] } # Backends alsa = { version = "0.10", optional = true } jack = { version = "0.13", optional = true } -libspa = { version = "0.9", optional = true } -pipewire = { version = "0.9", optional = true } portaudio-rs = { version = "0.3", optional = true } sdl2 = { version = "0.38", optional = true } @@ -77,6 +75,11 @@ gstreamer-audio = { version = "0.24", optional = true } libpulse-binding = { version = "2", optional = true, default-features = false } libpulse-simple-binding = { version = "2", optional = true, default-features = false } +# Pipewire dependencies +libspa = { version = "0.9", optional = true } +pipewire = { version = "0.9", optional = true } + + # Rodio dependencies cpal = { version = "0.16", optional = true } rodio = { version = "0.21", optional = true, default-features = false, features = [ From 7ff2a57f834e12b847c70fe689b6e3cad0c59f04 Mon Sep 17 00:00:00 2001 From: Guillaume Giraudon Date: Tue, 7 Oct 2025 19:48:22 +0000 Subject: [PATCH 3/4] Fix syntax and formatting to pass fmt check --- Cargo.lock | 2 - playback/src/audio_backend/pipewire.rs | 77 ++++++++++++++------------ 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23c07e554..7e2db94cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1946,9 +1946,7 @@ dependencies = [ "librespot-oauth", "librespot-playback", "librespot-protocol", - "libspa", "log", - "pipewire", "sha1", "sysinfo", "thiserror 2.0.16", diff --git a/playback/src/audio_backend/pipewire.rs b/playback/src/audio_backend/pipewire.rs index 3c636a72a..958c6c47d 100644 --- a/playback/src/audio_backend/pipewire.rs +++ b/playback/src/audio_backend/pipewire.rs @@ -3,15 +3,18 @@ use crate::config::AudioFormat; use crate::convert::Converter; use crate::decoder::AudioPacket; use crate::{NUM_CHANNELS, SAMPLE_RATE}; -use std::sync::mpsc::{sync_channel, SyncSender, Receiver}; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::sync::mpsc::{Receiver, SyncSender, sync_channel}; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; use std::thread; use thiserror::Error; +use libspa::sys as spa_sys; use pipewire as pw; use pw::{properties::properties, spa}; use spa::pod::Pod; -use libspa::sys as spa_sys; #[derive(Debug, Error)] enum PipeWireError { @@ -92,33 +95,33 @@ impl Sink for PipeWireSink { if self.initialized { return Ok(()); } - + info!("Starting PipeWire sink..."); - + let format = self.format; let quit_flag = Arc::clone(&self.quit_flag); - + // Create a sync channel - buffer size for ~1 second of audio to prevent underruns let sample_size = calculate_sample_size(format); let buffer_size = SAMPLE_RATE as usize * NUM_CHANNELS as usize * sample_size; let (sender, receiver) = sync_channel::(buffer_size); - + // Store the sender for write_bytes self.sender = Some(sender); - + // Run PipeWire main loop in a separate thread with the receiver let handle = thread::spawn(move || { if let Err(e) = run_pipewire_loop(receiver, quit_flag, format) { error!("PipeWire loop error: {}", e); } }); - + self._main_loop_handle = Some(handle); self.initialized = true; - + // Give the thread a moment to initialize thread::sleep(std::time::Duration::from_millis(100)); - + info!("PipeWire sink started successfully"); Ok(()) } @@ -129,10 +132,10 @@ impl Sink for PipeWireSink { } info!("Stopping PipeWire sink..."); - + // Signal the thread to quit self.quit_flag.store(true, Ordering::Relaxed); - + // Drop the sender to signal the receiver thread to exit self.sender = None; @@ -189,14 +192,15 @@ fn run_pipewire_loop( ) -> Result<(), PipeWireError> { // Initialize PipeWire pw::init(); - + let mainloop = pw::main_loop::MainLoopRc::new(None) .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; - + let context = pw::context::ContextRc::new(&mainloop, None) .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; - - let core = context.connect_rc(None) + + let core = context + .connect_rc(None) .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; let stream = pw::stream::StreamBox::new( @@ -214,10 +218,10 @@ fn run_pipewire_loop( let sample_size = calculate_sample_size(format); let stride = sample_size * NUM_CHANNELS as usize; - + // Clone mainloop for use in the listener callback let mainloop_quit = mainloop.clone(); - + // This is the key: the listener is based on the working pipewire_tone_test.rs example let _listener = stream .add_local_listener_with_user_data((receiver, quit_flag.clone())) @@ -227,7 +231,7 @@ fn run_pipewire_loop( mainloop_quit.quit(); return; } - + match stream.dequeue_buffer() { None => { // No buffer available, this is normal @@ -235,14 +239,14 @@ fn run_pipewire_loop( Some(mut buffer) => { let datas = buffer.datas_mut(); let data = &mut datas[0]; - + let n_frames = if let Some(slice) = data.data() { let n_frames = slice.len() / stride; let total_bytes = n_frames * stride; - + // Read from channel - try non-blocking first, then blocking if needed let mut bytes_read = 0; - + // First, try to read without blocking for i in 0..total_bytes { match receiver.try_recv() { @@ -253,7 +257,7 @@ fn run_pipewire_loop( Err(_) => break, } } - + // If we didn't get enough data, block to get more // This prevents underruns and stuttering if bytes_read < total_bytes { @@ -273,12 +277,12 @@ fn run_pipewire_loop( } } } - + n_frames } else { 0 }; - + // This matches the working pipewire_tone_test.rs example exactly let chunk = data.chunk_mut(); *chunk.offset_mut() = 0; @@ -295,7 +299,7 @@ fn run_pipewire_loop( audio_info.set_format(convert_audio_format(format)); audio_info.set_rate(SAMPLE_RATE); audio_info.set_channels(NUM_CHANNELS as u32); - + let mut position = [0; spa::param::audio::MAX_CHANNELS]; position[0] = spa_sys::SPA_AUDIO_CHANNEL_FL; position[1] = spa_sys::SPA_AUDIO_CHANNEL_FR; @@ -316,15 +320,16 @@ fn run_pipewire_loop( let mut params = [Pod::from_bytes(&values).unwrap()]; // Connect stream - matches pipewire_tone_test.rs - stream.connect( - spa::utils::Direction::Output, - None, - pw::stream::StreamFlags::AUTOCONNECT - | pw::stream::StreamFlags::MAP_BUFFERS - | pw::stream::StreamFlags::RT_PROCESS, - &mut params, - ) - .map_err(|e| PipeWireError::StreamConnect(format!("{:?}", e)))?; + stream + .connect( + spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + ) + .map_err(|e| PipeWireError::StreamConnect(format!("{:?}", e)))?; // Run the main loop mainloop.run(); From 3bad516f9a9b8f45225408d673f552a51799cd81 Mon Sep 17 00:00:00 2001 From: Guillaume Giraudon Date: Tue, 7 Oct 2025 21:48:30 +0000 Subject: [PATCH 4/4] Replace bytefeeding with ringbuffer to feed channels --- Cargo.lock | 12 +++ playback/Cargo.toml | 3 +- playback/src/audio_backend/pipewire.rs | 110 ++++++++++++------------- 3 files changed, 69 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e2db94cb..a75a9875b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2131,6 +2131,7 @@ dependencies = [ "portaudio-rs", "rand 0.9.2", "rand_distr", + "ringbuf", "rodio", "sdl2", "shell-words", @@ -3227,6 +3228,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuf" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" +dependencies = [ + "crossbeam-utils", + "portable-atomic", + "portable-atomic-util", +] + [[package]] name = "rodio" version = "0.21.1" diff --git a/playback/Cargo.toml b/playback/Cargo.toml index aff9bd76c..01b394a38 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -20,7 +20,7 @@ gstreamer-backend = [ "dep:gstreamer-audio", ] jackaudio-backend = ["dep:jack"] -pipewire-backend = ["dep:pipewire", "dep:libspa"] +pipewire-backend = ["dep:pipewire", "dep:libspa", "dep:ringbuf"] portaudio-backend = ["dep:portaudio-rs"] pulseaudio-backend = ["dep:libpulse-binding", "dep:libpulse-simple-binding"] rodio-backend = ["dep:cpal", "dep:rodio"] @@ -78,6 +78,7 @@ libpulse-simple-binding = { version = "2", optional = true, default-features = f # Pipewire dependencies libspa = { version = "0.9", optional = true } pipewire = { version = "0.9", optional = true } +ringbuf = { version = "0.4", optional = true } # Rodio dependencies diff --git a/playback/src/audio_backend/pipewire.rs b/playback/src/audio_backend/pipewire.rs index 958c6c47d..ba7c1df77 100644 --- a/playback/src/audio_backend/pipewire.rs +++ b/playback/src/audio_backend/pipewire.rs @@ -3,7 +3,6 @@ use crate::config::AudioFormat; use crate::convert::Converter; use crate::decoder::AudioPacket; use crate::{NUM_CHANNELS, SAMPLE_RATE}; -use std::sync::mpsc::{Receiver, SyncSender, sync_channel}; use std::sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -14,8 +13,26 @@ use thiserror::Error; use libspa::sys as spa_sys; use pipewire as pw; use pw::{properties::properties, spa}; +use ringbuf::{ + HeapRb, + traits::{Consumer, Producer, Split}, +}; use spa::pod::Pod; +type RingProducer = ringbuf::wrap::caching::Caching< + std::sync::Arc>>, + true, + false, +>; +type RingConsumer = ringbuf::wrap::caching::Caching< + std::sync::Arc>>, + false, + true, +>; + +// Ring buffer size: 1 second of audio at max quality (F64, stereo, 44.1kHz) +const RING_BUFFER_SIZE: usize = 44100 * 2 * 8; + #[derive(Debug, Error)] enum PipeWireError { #[error(" Failed to Create Main Loop: {0}")] @@ -46,8 +63,8 @@ impl From for SinkError { pub struct PipeWireSink { format: AudioFormat, - // Channel sender for audio data - sender: Option>, + // Lock-free ring buffer producer for audio data + producer: Option, // Flag to signal thread to stop quit_flag: Arc, initialized: bool, @@ -70,8 +87,8 @@ fn convert_audio_format(format: AudioFormat) -> spa::param::audio::AudioFormat { F64 => spa::param::audio::AudioFormat::F64LE, F32 => spa::param::audio::AudioFormat::F32LE, S32 => spa::param::audio::AudioFormat::S32LE, - S24 => spa::param::audio::AudioFormat::S24LE, - S24_3 => spa::param::audio::AudioFormat::S24_32LE, + S24 => spa::param::audio::AudioFormat::S24_32LE, + S24_3 => spa::param::audio::AudioFormat::S24LE, S16 => spa::param::audio::AudioFormat::S16LE, } } @@ -82,7 +99,7 @@ impl Open for PipeWireSink { Self { format, - sender: None, + producer: None, quit_flag: Arc::new(AtomicBool::new(false)), initialized: false, _main_loop_handle: None, @@ -101,17 +118,16 @@ impl Sink for PipeWireSink { let format = self.format; let quit_flag = Arc::clone(&self.quit_flag); - // Create a sync channel - buffer size for ~1 second of audio to prevent underruns - let sample_size = calculate_sample_size(format); - let buffer_size = SAMPLE_RATE as usize * NUM_CHANNELS as usize * sample_size; - let (sender, receiver) = sync_channel::(buffer_size); + // Create a lock-free ring buffer for real-time audio transfer + let ring_buffer = HeapRb::::new(RING_BUFFER_SIZE); + let (producer, consumer) = ring_buffer.split(); - // Store the sender for write_bytes - self.sender = Some(sender); + // Store the producer for write_bytes + self.producer = Some(producer); - // Run PipeWire main loop in a separate thread with the receiver + // Run PipeWire main loop in a separate thread with the consumer let handle = thread::spawn(move || { - if let Err(e) = run_pipewire_loop(receiver, quit_flag, format) { + if let Err(e) = run_pipewire_loop(consumer, quit_flag, format) { error!("PipeWire loop error: {}", e); } }); @@ -136,8 +152,8 @@ impl Sink for PipeWireSink { // Signal the thread to quit self.quit_flag.store(true, Ordering::Relaxed); - // Drop the sender to signal the receiver thread to exit - self.sender = None; + // Drop the producer to signal the consumer thread to exit + self.producer = None; // Wait for the thread to finish with a timeout if let Some(handle) = self._main_loop_handle.take() { @@ -163,10 +179,18 @@ impl SinkAsBytes for PipeWireSink { return Err(PipeWireError::NotConnected.into()); } - if let Some(ref sender) = self.sender { - // Send data through the channel - use blocking send to ensure all data is queued - for &byte in data { - sender.send(byte).map_err(|_| PipeWireError::NotConnected)?; + if let Some(ref mut producer) = self.producer { + // Push data to the lock-free ring buffer in chunks + // This is much more efficient than byte-by-byte and is wait-free + let mut offset = 0; + while offset < data.len() { + let written = producer.push_slice(&data[offset..]); + if written == 0 { + // Ring buffer is full, wait a tiny bit for consumer to catch up + thread::sleep(std::time::Duration::from_micros(100)); + } else { + offset += written; + } } Ok(()) } else { @@ -186,7 +210,7 @@ impl PipeWireSink { } fn run_pipewire_loop( - receiver: Receiver, + consumer: RingConsumer, quit_flag: Arc, format: AudioFormat, ) -> Result<(), PipeWireError> { @@ -222,10 +246,11 @@ fn run_pipewire_loop( // Clone mainloop for use in the listener callback let mainloop_quit = mainloop.clone(); - // This is the key: the listener is based on the working pipewire_tone_test.rs example + // Use PipeWire's real-time callback with lock-free ring buffer consumer + // This ensures optimal performance and real-time safety let _listener = stream - .add_local_listener_with_user_data((receiver, quit_flag.clone())) - .process(move |stream, (receiver, quit_flag)| { + .add_local_listener_with_user_data((consumer, quit_flag.clone())) + .process(move |stream, (consumer, quit_flag)| { // Check if we should quit if quit_flag.load(Ordering::Relaxed) { mainloop_quit.quit(); @@ -244,38 +269,13 @@ fn run_pipewire_loop( let n_frames = slice.len() / stride; let total_bytes = n_frames * stride; - // Read from channel - try non-blocking first, then blocking if needed - let mut bytes_read = 0; - - // First, try to read without blocking - for i in 0..total_bytes { - match receiver.try_recv() { - Ok(byte) => { - slice[i] = byte; - bytes_read += 1; - } - Err(_) => break, - } - } + // Pop data from the lock-free ring buffer + // This is wait-free and real-time safe + let bytes_read = consumer.pop_slice(slice); - // If we didn't get enough data, block to get more - // This prevents underruns and stuttering + // Fill any remaining space with silence if underrun if bytes_read < total_bytes { - for i in bytes_read..total_bytes { - match receiver.recv() { - Ok(byte) => { - slice[i] = byte; - } - Err(_) => { - // Channel disconnected, fill rest with silence and signal quit - for j in i..total_bytes { - slice[j] = 0; - } - quit_flag.store(true, Ordering::Relaxed); - break; - } - } - } + slice[bytes_read..total_bytes].fill(0); } n_frames @@ -283,7 +283,7 @@ fn run_pipewire_loop( 0 }; - // This matches the working pipewire_tone_test.rs example exactly + // Configure the buffer chunk metadata let chunk = data.chunk_mut(); *chunk.offset_mut() = 0; *chunk.stride_mut() = stride as _;