diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index fed99f1df..be1c96974 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -59,6 +59,7 @@ tracing = { version = "0.1", default-features = false, features = [ "log", ] } tracing-opentelemetry = { version = "0.22.0", default-features = false } +trait-variant = "0.1.1" opentelemetry = { version = "0.21.0", default-features = false } diff --git a/tarpc/src/client/stub.rs b/tarpc/src/client/stub.rs index e7c11aa05..6c7bfe5ce 100644 --- a/tarpc/src/client/stub.rs +++ b/tarpc/src/client/stub.rs @@ -2,9 +2,7 @@ use crate::{ client::{Channel, RpcError}, - context, - server::Serve, - RequestName, + context, RequestName, }; pub mod load_balance; @@ -16,6 +14,7 @@ mod mock; /// A connection to a remote service. /// Calls the service with requests of type `Req` and receives responses of type `Resp`. #[allow(async_fn_in_trait)] +#[trait_variant::make(SendStub: Send)] pub trait Stub { /// The service request type. type Req: RequestName; @@ -39,14 +38,3 @@ where Self::call(self, ctx, request).await } } - -impl Stub for S -where - S: Serve + Clone, -{ - type Req = S::Req; - type Resp = S::Resp; - async fn call(&self, ctx: context::Context, req: Self::Req) -> Result { - self.clone().serve(ctx, req).await.map_err(RpcError::Server) - } -} diff --git a/tarpc/src/server.rs b/tarpc/src/server.rs index d79d45c2c..6c0cee92f 100644 --- a/tarpc/src/server.rs +++ b/tarpc/src/server.rs @@ -6,6 +6,8 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. +use crate::client::stub::Stub; +use crate::client::RpcError; use crate::{ cancellations::{cancellations, CanceledRequests, RequestCancellation}, context::{self, SpanExt}, @@ -66,6 +68,27 @@ impl Config { } } +/// A [`Stub`] implementation that simply warps a `Serve`. +pub struct ServeStub { + serve: S, +} + +impl Stub for ServeStub +where + S: Serve + Clone, +{ + type Req = ::Req; + type Resp = ::Resp; + + async fn call(&self, ctx: context::Context, req: Self::Req) -> Result { + self.serve + .clone() + .serve(ctx, req) + .await + .map_err(RpcError::Server) + } +} + /// Equivalent to a `FnOnce(Req) -> impl Future`. #[allow(async_fn_in_trait)] pub trait Serve { @@ -77,6 +100,16 @@ pub trait Serve { /// Responds to a single request. async fn serve(self, ctx: context::Context, req: Self::Req) -> Result; + + /// Wrap this `Serve` in a type that implements [`Stub`]. + async fn into_stub(self) -> ServeStub + where + Self: Clone, + { + ServeStub { + serve: self.clone(), + } + } } /// A Serve wrapper around a Fn.