Skip to content

Commit 0d4367f

Browse files
committed
Improve the subprocess backend
Better error handling. Move the checking of the shell command to start so a proper error can be thrown if it's None. Use write instead of write_all for finer grained error handling and the ability to attempt a restart on write errors. Use try_wait to skip flushing and killing the process if it's already dead. Stop the player on shutdown to *mostly* prevent write errors from spamming the logs during shutdown. Previously Ctrl+c always resulted in a write error.
1 parent 95dc9f8 commit 0d4367f

2 files changed

Lines changed: 158 additions & 40 deletions

File tree

connect/src/spirc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ impl SpircTask {
478478
}
479479
SpircCommand::Shutdown => {
480480
CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
481+
self.player.stop();
481482
self.shutdown = true;
482483
if let Some(rx) = self.commands.as_mut() {
483484
rx.close()

playback/src/audio_backend/subprocess.rs

Lines changed: 157 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,75 @@ use crate::convert::Converter;
44
use crate::decoder::AudioPacket;
55
use shell_words::split;
66

7-
use std::io::Write;
7+
use std::io::{ErrorKind, Write};
88
use std::process::{exit, Child, Command, Stdio};
9+
use thiserror::Error;
10+
11+
#[derive(Debug, Error)]
12+
enum SubprocessError {
13+
#[error("<SubprocessSink> {0}")]
14+
OnWrite(std::io::Error),
15+
16+
#[error("<SubprocessSink> Command {command} Can Not be Executed, {e}")]
17+
SpawnFailure { command: String, e: std::io::Error },
18+
19+
#[error("<SubprocessSink> Failed to Parse Command args for {command}, {e}")]
20+
InvalidArgs {
21+
command: String,
22+
e: shell_words::ParseError,
23+
},
24+
25+
#[error("<SubprocessSink> Failed to Flush the Subprocess, {0}")]
26+
FlushFailure(std::io::Error),
27+
28+
#[error("<SubprocessSink> Failed to Kill the Subprocess, {0}")]
29+
KillFailure(std::io::Error),
30+
31+
#[error("<SubprocessSink> Failed to Wait for the Subprocess to Exit, {0}")]
32+
WaitFailure(std::io::Error),
33+
34+
#[error("<SubprocessSink> The Subprocess is no longer able to accept Bytes")]
35+
WriteZero,
36+
37+
#[error("<SubprocessSink> Missing Required Shell Command")]
38+
MissingCommand,
39+
40+
#[error("<SubprocessSink> The Subprocess is None")]
41+
NoChild,
42+
43+
#[error("<SubprocessSink> The Subprocess's stdin is None")]
44+
NoStdin,
45+
}
46+
47+
impl From<SubprocessError> for SinkError {
48+
fn from(e: SubprocessError) -> SinkError {
49+
use SubprocessError::*;
50+
let es = e.to_string();
51+
match e {
52+
FlushFailure(_) | KillFailure(_) | WaitFailure(_) | OnWrite(_) | WriteZero => {
53+
SinkError::OnWrite(es)
54+
}
55+
SpawnFailure { .. } => SinkError::ConnectionRefused(es),
56+
MissingCommand | InvalidArgs { .. } => SinkError::InvalidParams(es),
57+
NoChild | NoStdin => SinkError::NotConnected(es),
58+
}
59+
}
60+
}
961

1062
pub struct SubprocessSink {
11-
shell_command: String,
63+
shell_command: Option<String>,
1264
child: Option<Child>,
1365
format: AudioFormat,
1466
}
1567

1668
impl Open for SubprocessSink {
1769
fn open(shell_command: Option<String>, format: AudioFormat) -> Self {
18-
let shell_command = match shell_command.as_deref() {
19-
Some("?") => {
20-
info!("Usage: --backend subprocess --device {{shell_command}}");
21-
exit(0);
22-
}
23-
Some(cmd) => cmd.to_owned(),
24-
None => {
25-
error!("subprocess sink requires specifying a shell command");
26-
exit(1);
27-
}
28-
};
70+
if let Some("?") = shell_command.as_deref() {
71+
println!("\nUsage:\n\nOutput to a Subprocess:\n\n\t--backend subprocess --device {{shell_command}}\n");
72+
exit(0);
73+
}
2974

30-
info!("Using subprocess sink with format: {:?}", format);
75+
info!("Using SubprocessSink with format: {:?}", format);
3176

3277
Self {
3378
shell_command,
@@ -39,49 +84,121 @@ impl Open for SubprocessSink {
3984

4085
impl Sink for SubprocessSink {
4186
fn start(&mut self) -> SinkResult<()> {
42-
let args = split(&self.shell_command).unwrap();
43-
let child = Command::new(&args[0])
44-
.args(&args[1..])
45-
.stdin(Stdio::piped())
46-
.spawn()
47-
.map_err(|e| SinkError::ConnectionRefused(e.to_string()))?;
48-
self.child = Some(child);
87+
self.child.get_or_insert({
88+
match self.shell_command.as_deref() {
89+
Some(command) => {
90+
let args = split(command).map_err(|e| SubprocessError::InvalidArgs {
91+
command: command.to_string(),
92+
e,
93+
})?;
94+
95+
Command::new(&args[0])
96+
.args(&args[1..])
97+
.stdin(Stdio::piped())
98+
.spawn()
99+
.map_err(|e| SubprocessError::SpawnFailure {
100+
command: command.to_string(),
101+
e,
102+
})?
103+
}
104+
None => return Err(SubprocessError::MissingCommand.into()),
105+
}
106+
});
107+
49108
Ok(())
50109
}
51110

52111
fn stop(&mut self) -> SinkResult<()> {
53-
if let Some(child) = &mut self.child.take() {
54-
child
55-
.kill()
56-
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
57-
child
58-
.wait()
59-
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
112+
let child = &mut self.child.take().ok_or(SubprocessError::NoChild)?;
113+
114+
match child.try_wait() {
115+
// The process has already exited
116+
// nothing to do.
117+
Ok(Some(_)) => Ok(()),
118+
Ok(_) => {
119+
// The process Must DIE!!!
120+
child
121+
.stdin
122+
.take()
123+
.ok_or(SubprocessError::NoStdin)?
124+
.flush()
125+
.map_err(SubprocessError::FlushFailure)?;
126+
127+
child.kill().map_err(SubprocessError::KillFailure)?;
128+
child.wait().map_err(SubprocessError::WaitFailure)?;
129+
130+
Ok(())
131+
}
132+
Err(e) => Err(SubprocessError::WaitFailure(e).into()),
60133
}
61-
Ok(())
62134
}
63135

64136
sink_as_bytes!();
65137
}
66138

67139
impl SinkAsBytes for SubprocessSink {
68140
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
69-
if let Some(child) = &mut self.child {
70-
let child_stdin = child
141+
// We get one attempted restart per write.
142+
// We don't want to get stuck in a restart loop.
143+
let mut restarted = false;
144+
let mut start_index = 0;
145+
let data_len = data.len();
146+
let mut end_index = data_len;
147+
148+
loop {
149+
match self
150+
.child
151+
.as_ref()
152+
.ok_or(SubprocessError::NoChild)?
71153
.stdin
72-
.as_mut()
73-
.ok_or_else(|| SinkError::NotConnected("Child is None".to_string()))?;
74-
child_stdin
75-
.write_all(data)
76-
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
77-
child_stdin
78-
.flush()
79-
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
154+
.as_ref()
155+
.ok_or(SubprocessError::NoStdin)?
156+
.write(&data[start_index..end_index])
157+
{
158+
Ok(0) => {
159+
// Potentially fatal.
160+
// As per the docs a return value of 0
161+
// means we shouldn't try to write to the
162+
// process anymore so let's try a restart
163+
// if we haven't already.
164+
self.try_restart(SubprocessError::WriteZero, &mut restarted)?;
165+
166+
continue;
167+
}
168+
Ok(bytes_written) => {
169+
// What we want, a successful write.
170+
start_index = data_len.min(start_index + bytes_written);
171+
end_index = data_len.min(start_index + bytes_written);
172+
173+
if end_index == data_len {
174+
break Ok(());
175+
}
176+
}
177+
// Non-fatal, retry the write.
178+
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
179+
Err(e) => {
180+
// Very possibly fatal,
181+
// but let's try a restart anyway if we haven't already.
182+
self.try_restart(SubprocessError::OnWrite(e), &mut restarted)?;
183+
184+
continue;
185+
}
186+
}
80187
}
81-
Ok(())
82188
}
83189
}
84190

85191
impl SubprocessSink {
86192
pub const NAME: &'static str = "subprocess";
193+
194+
fn try_restart(&mut self, e: SubprocessError, restarted: &mut bool) -> SinkResult<()> {
195+
// If the restart fails throw the original error back.
196+
if !*restarted && self.stop().is_ok() && self.start().is_ok() {
197+
*restarted = true;
198+
199+
Ok(())
200+
} else {
201+
Err(e.into())
202+
}
203+
}
87204
}

0 commit comments

Comments
 (0)