Skip to content

Commit da0deb1

Browse files
committed
Convert discovery to hyper 1.x
1 parent 6a4053e commit da0deb1

4 files changed

Lines changed: 86 additions & 50 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

discovery/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@ edition = "2021"
1111
[dependencies]
1212
aes = "0.8"
1313
base64 = "0.21"
14+
bytes = "1"
1415
cfg-if = "1.0"
1516
ctr = "0.9"
1617
dns-sd = { version = "0.1.3", optional = true }
1718
form_urlencoded = "1.0"
1819
futures-core = "0.3"
1920
futures-util = "0.3"
2021
hmac = "0.12"
21-
hyper = { version = "0.14", features = ["http1", "server", "tcp", "backports", "deprecated"] }
22+
hyper = { version = "1.3", features = ["http1"] }
23+
hyper-util = { version = "0.1", features = ["server-auto", "server-graceful", "service"] }
24+
http-body-util = "0.1.1"
2225
libmdns = "0.8"
2326
log = "0.4"
2427
rand = "0.8"

discovery/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl Builder {
124124
pub fn launch(self) -> Result<Discovery, Error> {
125125
let mut port = self.port;
126126
let name = self.server_config.name.clone().into_owned();
127-
let server = DiscoveryServer::new(self.server_config, &mut port)??;
127+
let server = DiscoveryServer::new(self.server_config, &mut port)?;
128128
let _zeroconf_ip = self.zeroconf_ip;
129129
let svc;
130130

discovery/src/server.rs

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{
22
borrow::Cow,
33
collections::BTreeMap,
44
convert::Infallible,
5-
net::{Ipv4Addr, SocketAddr},
5+
net::{Ipv4Addr, SocketAddr, TcpListener},
66
pin::Pin,
77
sync::Arc,
88
task::{Context, Poll},
@@ -11,13 +11,16 @@ use std::{
1111
use aes::cipher::{KeyIvInit, StreamCipher};
1212
use base64::engine::general_purpose::STANDARD as BASE64;
1313
use base64::engine::Engine as _;
14+
use bytes::Bytes;
1415
use futures_core::Stream;
1516
use futures_util::{FutureExt, TryFutureExt};
1617
use hmac::{Hmac, Mac};
18+
use http_body_util::{BodyExt, Full};
1719
use hyper::{
18-
body::HttpBody, service::{make_service_fn, service_fn}, Body, Method, Request, Response, StatusCode
20+
body::Incoming, Method, Request, Response, StatusCode
1921
};
2022

23+
use hyper_util::{rt::TokioIo, server::graceful::GracefulShutdown};
2124
use log::{debug, error, warn};
2225
use serde_json::json;
2326
use sha1::{Digest, Sha1};
@@ -62,7 +65,7 @@ impl RequestHandler {
6265
(discovery, rx)
6366
}
6467

65-
fn handle_get_info(&self) -> Response<hyper::Body> {
68+
fn handle_get_info(&self) -> Response<Full<Bytes>> {
6669
let public_key = BASE64.encode(self.keys.public_key());
6770
let device_type: &str = self.config.device_type.into();
6871
let mut active_user = String::new();
@@ -106,11 +109,11 @@ impl RequestHandler {
106109
// - "deviceAPI_isGroup": False
107110
})
108111
.to_string();
109-
110-
Response::new(Body::from(body))
112+
let body = Bytes::from(body);
113+
Response::new(Full::new(body))
111114
}
112115

113-
fn handle_add_user(&self, params: &Params<'_>) -> Result<Response<hyper::Body>, Error> {
116+
fn handle_add_user(&self, params: &Params<'_>) -> Result<Response<Full<Bytes>>, Error> {
114117
let username_key = "userName";
115118
let username = params
116119
.get(username_key)
@@ -170,7 +173,8 @@ impl RequestHandler {
170173
});
171174

172175
let body = result.to_string();
173-
return Ok(Response::new(Body::from(body)));
176+
let body = Bytes::from(body);
177+
return Ok(Response::new(Full::new(body)));
174178
}
175179

176180
let decrypted = {
@@ -192,19 +196,20 @@ impl RequestHandler {
192196
});
193197

194198
let body = result.to_string();
195-
Ok(Response::new(Body::from(body)))
199+
let body = Bytes::from(body);
200+
Ok(Response::new(Full::new(body)))
196201
}
197202

198-
fn not_found(&self) -> Response<hyper::Body> {
203+
fn not_found(&self) -> Response<Full<Bytes>> {
199204
let mut res = Response::default();
200205
*res.status_mut() = StatusCode::NOT_FOUND;
201206
res
202207
}
203208

204209
async fn handle(
205210
self: Arc<Self>,
206-
request: Request<Body>,
207-
) -> Result<hyper::Result<Response<Body>>, Error> {
211+
request: Request<Incoming>,
212+
) -> Result<hyper::Result<Response<Full<Bytes>>>, Error> {
208213
let mut params = Params::new();
209214

210215
let (parts, body) = request.into_parts();
@@ -238,52 +243,78 @@ pub struct DiscoveryServer {
238243
}
239244

240245
impl DiscoveryServer {
241-
pub fn new(config: Config, port: &mut u16) -> Result<hyper::Result<Self>, Error> {
246+
pub fn new(config: Config, port: &mut u16) -> Result<Self, Error> {
242247
let (discovery, cred_rx) = RequestHandler::new(config);
243-
let discovery = Arc::new(discovery);
244-
245-
let (close_tx, close_rx) = oneshot::channel();
246-
247248
let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port);
249+
250+
let (close_tx, close_rx) = oneshot::channel();
248251

249-
let make_service = make_service_fn(move |_| {
250-
let discovery = discovery.clone();
251-
async move {
252-
Ok::<_, hyper::Error>(service_fn(move |request| {
253-
discovery
254-
.clone()
255-
.handle(request)
256-
.inspect_err(|e| error!("could not handle discovery request: {}", e))
257-
.and_then(|x| async move { Ok(x) })
258-
.map(Result::unwrap) // guaranteed by `and_then` above
259-
}))
252+
let listener = match TcpListener::bind(address) {
253+
Ok(listener) => listener,
254+
Err(e) => {
255+
warn!("Discovery server failed to start: {e}");
256+
return Err(e.into());
260257
}
261-
});
258+
};
259+
260+
listener.set_nonblocking(true)?;
261+
let listener = tokio::net::TcpListener::from_std(listener)?;
262262

263-
let server = hyper::Server::try_bind(&address)?.serve(make_service);
263+
match listener.local_addr() {
264+
Ok(addr) => {
265+
*port = addr.port();
266+
debug!("Zeroconf server listening on 0.0.0.0:{}", *port);
267+
}
268+
Err(e) => {
269+
warn!("Discovery server failed to start: {e}");
270+
return Err(e.into());
271+
}
272+
}
264273

265-
*port = server.local_addr().port();
266-
debug!("Zeroconf server listening on 0.0.0.0:{}", *port);
267274

268-
tokio::spawn(async {
269-
let result = server
270-
.with_graceful_shutdown(async {
271-
if let Err(e) = close_rx.await {
272-
debug!("unable to close discovery Rx channel completely: {e}");
275+
tokio::spawn(async move {
276+
let discovery = Arc::new(discovery);
277+
278+
let server = hyper::server::conn::http1::Builder::new();
279+
let graceful = GracefulShutdown::new();
280+
let mut close_rx = std::pin::pin!(close_rx);
281+
loop {
282+
tokio::select! {
283+
Ok((stream, _)) = listener.accept() => {
284+
let io = TokioIo::new(stream);
285+
let discovery = discovery.clone();
286+
287+
let svc = hyper::service::service_fn(move |request| {
288+
discovery
289+
.clone()
290+
.handle(request)
291+
.inspect_err(|e| error!("could not handle discovery request: {}", e))
292+
.and_then(|x| async move { Ok(x) })
293+
.map(Result::unwrap) // guaranteed by `and_then` above
294+
});
295+
296+
let conn = server.serve_connection(io, svc);
297+
let fut = graceful.watch(conn);
298+
tokio::spawn(async move {
299+
// Errors are logged in the service_fn
300+
let _ = fut.await;
301+
});
273302
}
274-
debug!("Shutting down discovery server");
275-
})
276-
.await;
277-
278-
if let Err(e) = result {
279-
warn!("Discovery server failed: {}", e);
303+
_ = &mut close_rx => {
304+
debug!("Shutting down discovery server");
305+
break;
306+
}
307+
}
280308
}
309+
310+
graceful.shutdown().await;
311+
debug!("Discovery server stopped");
281312
});
282313

283-
Ok(Ok(Self {
314+
Ok(Self {
284315
cred_rx,
285316
_close_tx: close_tx,
286-
}))
317+
})
287318
}
288319
}
289320

0 commit comments

Comments
 (0)