Skip to content

Commit 9c2204f

Browse files
authored
feat(zenorb): zoci ext methods (#1164)
## changes adds and documents extension methods for the Zenoh Orb Command Interface: small helpers for sending commands using zenoh queries and using strings/json underneath small example, using utility methods: - `Query::args` - `Query::res` - `zenorb::Session::command_raw` - `Result<Reply, ReplyErr>::json` ```rust use zenorb::zoci::{ReplyExt, ZociQueryExt}; blue.receiver(()) .queryable("tuple", async |_ctx, query| { let args: (String, String) = query.args()?; query.res(&args).await?; Ok(()) }) .run() .await?; let red = Zenorb::from_cfg(client_cfg.clone()) .orb_id(orb_id.clone()) .with_name("red") .await?; let reply = red.command_raw("blue/tuple", "one two").await?; let reply: Result<(String, String), String> = reply.json()?; ```
1 parent bf69e48 commit 9c2204f

6 files changed

Lines changed: 997 additions & 88 deletions

File tree

zenorb/README.md

Lines changed: 217 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,171 +1,301 @@
11
# zenorb
22

3-
A helper library for using [Zenoh](https://zenoh.io/) with orb-specific conventions. Just a small
4-
wrapper for delcaring publishers, queriers, queryables and subscribers. Tries to use native `zenoh` types
5-
as much as possible.
3+
`zenorb` is a small wrapper around [Zenoh](https://zenoh.io/) for orb services.
64

7-
## Design Decisions
5+
It keeps Zenoh's native types and behavior, but adds a few orb-specific
6+
conventions:
7+
8+
- declared publisher and querier registries
9+
- shared context for subscribers and queryables
10+
- consistent orb/service key expression prefixes
11+
- a small command/reply layer for query-based interactions
812

9-
### 1. Publisher & Querier Registry
13+
## Design Decisions
1014

11-
Zenoh encourages holding declared publishers and queriers across the application lifetime for performance optimizations. Without this, you'd need to either:
15+
### 1. Declared Publishers and Queriers
1216

13-
1. Declare publishers/queriers on every send (inefficient)
14-
2. Manually manage a hashmap of declared publishers/queriers throughout your codebase (boilerplate)
17+
Zenoh performs best when publishers and queriers are declared once and kept for
18+
the lifetime of the process.
1519

16-
`Sender` solves this by maintaining an internal registry of declared publishers and queriers. You declare them once at startup, then retrieve them by keyexpr when needed:
20+
Without a wrapper, each service has to either redeclare them repeatedly or build
21+
its own registry and thread that through the application. `Sender` provides that
22+
registry directly. You declare publishers and queriers at startup and retrieve
23+
them later by key expression.
1724

1825
```rust
19-
// At startup: declare all publishers and queriers
20-
let sender = session
26+
use zenoh::bytes::Encoding;
27+
use zenorb::Zenorb;
28+
29+
let zenorb = Zenorb::from_cfg(cfg)
30+
.orb_id(orb_id)
31+
.with_name("banana")
32+
.await?;
33+
34+
let sender = zenorb
2135
.sender()
2236
.publisher("events")
2337
.publisher_with("metrics", |p| p.encoding(Encoding::APPLICATION_JSON))
24-
.querier("other-service/status")
38+
.querier("apple/status")
2539
.build()
2640
.await?;
2741

28-
// Later: use them by keyexpr
42+
// Later
2943
sender.publisher("events")?.put(payload).await?;
30-
sender.querier("other-service/status")?.get().await?;
44+
sender.querier("apple/status")?.get().await?;
3145
```
3246

33-
`Sender` is `Clone` (wrapping an `Arc<Registry>`), so it can be passed around to different parts of your application.
47+
`Sender` is cheap to clone, so it can be shared freely across the application.
3448

35-
The only time you should use Zenoh's publishers directly (without declaring) is when topics are dynamic and not known at startup.
49+
### 2. Shared Context for Receivers
3650

37-
### 2. Context Injection & Automatic Error Logging
51+
`Receiver` lets you register subscribers and queryables that all receive the
52+
same cloned context.
3853

39-
`Receiver` accepts a generic `Ctx` type that gets cloned and passed to every handler. This enables:
54+
That is useful for shared state such as database handles, caches, metrics
55+
clients, or test doubles. Handlers return `Result<()>`, and `zenorb` logs
56+
failures with the relevant key expression so each call site does not have to
57+
repeat the same error-handling boilerplate.
4058

41-
- **Dependency injection**: Pass shared state, database connections, or test mocks
42-
- **Testability**: Inject test doubles without restructuring your handlers
59+
```rust
60+
#[derive(Clone)]
61+
struct AppCtx {
62+
db: Db,
63+
metrics: Metrics,
64+
}
4365

44-
Additionally, handlers return `Result<()>`, and errors are automatically logged with the keyexpr context. This eliminates repetitive error handling boilerplate:
66+
zenorb
67+
.receiver(AppCtx { db, metrics })
68+
.subscriber("apple/events", async |ctx, sample| {
69+
let event: Event = serde_json::from_slice(&sample.payload().to_bytes())?;
4570

46-
```rust
47-
// Without zenorb: manual cloning and error handling everywhere
48-
let subscriber = session.declare_subscriber("events").await?;
49-
let db = db.clone();
50-
let metrics = metrics.clone();
51-
task::spawn(async move {
52-
while let Ok(sample) = subscriber.recv_async().await {
53-
let db = db.clone();
54-
let metrics = metrics.clone();
55-
let result = async move {
56-
let data: Event = deserialize(&sample)?;
57-
db.insert(&data).await?;
58-
metrics.record(&data).await?;
59-
Ok(())
60-
};
61-
62-
if let Err(e) = result.await {
63-
tracing::error!("Handler failed: {e}");
64-
}
65-
}
66-
});
67-
68-
// With zenorb: context injection and automatic error logging
69-
session
70-
.receiver(Ctx { db, metrics })
71-
.subscriber("events", async |ctx, sample| {
72-
let data: Event = deserialize(&sample)?;
73-
ctx.db.insert(&data).await?;
74-
ctx.metrics.record(&data).await?;
71+
ctx.db.insert(&event).await?;
72+
ctx.metrics.record(&event).await?;
7573

7674
Ok(())
7775
})
76+
.run()
77+
.await?;
7878
```
7979

80-
### 3. Standardized Topic Format
81-
82-
All orb sessions need to namespace their topics by orb ID and a name to avoid collisions. Without this library, every service would need to:
80+
### 3. Orb and Service Naming
8381

84-
1. Carry around, `orb_id`, and `service_name` values
85-
2. Remember to format topics correctly
86-
3. Risk inconsistent formatting across services
82+
Orb services need stable key expressions so publishers, subscribers, queriers,
83+
and queryables all agree on the same paths.
8784

88-
zenorb standardizes this in the `Session`:
85+
`Zenorb` carries the orb ID and service name once:
8986

9087
```rust
91-
let session = Session::from_cfg(cfg)
88+
use zenorb::Zenorb;
89+
90+
let zenorb = Zenorb::from_cfg(cfg)
9291
.orb_id(orb_id)
93-
.with_name("my-service")
92+
.with_name("banana")
9493
.await?;
9594
```
9695

97-
Topic formats are then applied automatically:
96+
From there, `zenorb` applies the prefixes for you:
9897

99-
| Type | Format |
100-
| ---------- | --------------------------- |
101-
| Publisher | `<orb-id>/<name>/<keyexpr>` |
102-
| Subscriber | `<orb-id>/<keyexpr>` |
103-
| Queryable | `<orb-id>/<name>/<keyexpr>` |
104-
| Querier | `<orb-id>/<keyexpr>` |
98+
| Type | Format |
99+
| --- | --- |
100+
| Publisher | `<orb-id>/<service>/<keyexpr>` |
101+
| Queryable | `<orb-id>/<service>/<keyexpr>` |
102+
| Subscriber | `<orb-id>/<keyexpr>` |
103+
| Querier | `<orb-id>/<keyexpr>` |
105104

106-
Note that subscribers and queriers omit the service name since they typically listen to or query other services. The service name is part of the keyexpr you provide:
105+
That means publishers and queryables are service-scoped, while subscribers and
106+
queriers target the full service path you provide.
107107

108108
```rust
109-
// Service "banana" publishes to: ea2ea744/banana/events
110-
// Service "apple" subscribes to: ea2ea744/banana/events
111-
session
109+
// Service "banana" publishes to ea2ea744/banana/events
110+
sender.publisher("events")?.put(payload).await?;
111+
112+
// Service "apple" subscribes to ea2ea744/banana/events
113+
apple
112114
.receiver(ctx)
113-
.subscriber("banana/events", handler) // keyexpr includes source service
115+
.subscriber("banana/events", async |ctx, sample| {
116+
Ok(())
117+
})
114118
.run()
115119
.await?;
116120
```
117121

118-
## Example
122+
If `Receiver::queryable(...)` is too opinionated for a use case,
123+
`Zenorb::declare_queryable(...)` exposes the underlying Zenoh queryable builder
124+
while keeping the same service-scoped prefix.
125+
126+
### 4. ZOCI - Zenoh Orb Command Interface
127+
128+
Some queries are really commands: one request, one reply, typed success or typed
129+
error.
130+
131+
`zenorb::zoci` defines a small convention for that pattern.
132+
133+
Caller-side helpers:
134+
135+
- `Sender::command(...)` sends a JSON payload through a declared querier
136+
- `Sender::command_raw(...)` sends a raw string payload through a declared querier
137+
- `Zenorb::command(...)` sends a JSON payload with `Zenorb::get(...)`
138+
- `Zenorb::command_raw(...)` sends a raw string payload with `Zenorb::get(...)`
139+
140+
Handler-side helpers:
141+
142+
- `query.json()` decodes a JSON request payload
143+
- `query.args()` decodes a space-delimited argument payload
144+
- `query.res(...)` sends a JSON success reply
145+
- `query.res_err(...)` sends a JSON error reply
146+
147+
Reply-side helper:
148+
149+
- `ReplyExt::json()` decodes a reply into `Result<OkType, ErrType>`
150+
151+
The command API keeps Zenoh's two layers of failure visible:
152+
153+
- outer `Result`: transport or session failure
154+
- inner `Result`: success reply or `reply_err(...)`
155+
156+
#### ZOCI Example
119157

120158
```rust
121-
use orb_info::{orb_os_release::OrbRelease, OrbId};
122-
use zenoh::bytes::Encoding;
159+
use serde::{Deserialize, Serialize};
160+
use zenorb::{
161+
zoci::{ReplyExt, ZociQueryExt},
162+
Zenorb,
163+
};
164+
165+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166+
struct StatusRequest {
167+
id: u64,
168+
label: String,
169+
}
170+
171+
let red = Zenorb::from_cfg(client_cfg.clone())
172+
.orb_id(orb_id.clone())
173+
.with_name("red")
174+
.await?;
175+
176+
let blue = Zenorb::from_cfg(client_cfg)
177+
.orb_id(orb_id)
178+
.with_name("blue")
179+
.await?;
180+
181+
let sender = red
182+
.sender()
183+
.querier("blue/status")
184+
.build()
185+
.await?;
186+
187+
blue.receiver(())
188+
.queryable("status", async |_ctx, query| {
189+
let req: StatusRequest = query.json()?;
190+
query.res(&req).await?;
191+
192+
Ok(())
193+
})
194+
.run()
195+
.await?;
196+
197+
let reply = sender
198+
.command(
199+
"blue/status",
200+
&StatusRequest {
201+
id: 7,
202+
label: "banana".into(),
203+
},
204+
)
205+
.await?;
206+
207+
let reply: Result<StatusRequest, StatusRequest> = reply.json()?;
208+
```
209+
210+
For lighter payloads, pair `command_raw(...)` with `query.args()`:
211+
212+
```rust
213+
use zenorb::zoci::{ReplyExt, ZociQueryExt};
214+
215+
blue.receiver(())
216+
.queryable("tuple", async |_ctx, query| {
217+
let args: (String, String) = query.args()?;
218+
query.res(&args).await?;
219+
220+
Ok(())
221+
})
222+
.run()
223+
.await?;
224+
225+
let sender = red
226+
.sender()
227+
.querier("blue/tuple")
228+
.build()
229+
.await?;
230+
231+
let reply = sender.command_raw("blue/tuple", "one two").await?;
232+
let reply: Result<(String, String), String> = reply.json()?;
233+
```
234+
235+
## Basic Publish/Subscribe Example
236+
237+
The examples above show the command/reply flow. The example below goes back to
238+
the core `zenorb` pattern: declared publishers on one service, subscribers on
239+
another, and JSON payload handling in the subscriber callback.
240+
241+
```rust
242+
use orb_info::OrbId;
243+
use serde::{Deserialize, Serialize};
244+
use std::{str::FromStr, sync::Arc};
245+
use tokio::sync::Mutex;
246+
use zenorb::Zenorb;
247+
248+
#[derive(Debug, Clone, Serialize, Deserialize)]
249+
struct Message {
250+
text: String,
251+
}
123252

124253
#[derive(Clone)]
125254
struct AppCtx {
126-
received: Arc<Mutex<Vec<Message>>>,
255+
received: Arc<Mutex<Vec<Message>>>
127256
}
128257

129258
#[tokio::main]
130-
async fn main() -> Result<()> {
259+
async fn main() -> color_eyre::Result<()> {
131260
let cfg = zenorb::client_cfg(7447);
132261
let orb_id = OrbId::from_str("ea2ea744")?;
133262

134-
// Create sessions with two different names
135-
let banana_session = Session::from_cfg(cfg.clone())
263+
let banana = Zenorb::from_cfg(cfg.clone())
136264
.orb_id(orb_id.clone())
137265
.with_name("banana")
138266
.await?;
139267

140-
let apple_session = Session::from_cfg(cfg)
268+
let apple = Zenorb::from_cfg(cfg)
141269
.orb_id(orb_id)
142270
.with_name("apple")
143271
.await?;
144272

145-
// Set up the sender with declared publishers
146-
let sender = banana_session
273+
let sender = banana
147274
.sender()
148275
.publisher("notifications")
149276
.build()
150277
.await?;
151278

152-
// Set up the receiver with context and handlers
153-
let ctx = AppCtx { received: Arc::new(Mutex::new(vec![])) };
279+
let ctx = AppCtx {
280+
received: Arc::new(Mutex::new(vec![])),
281+
};
154282

155-
apple_session
156-
.receiver(ctx)
283+
apple
284+
.receiver(ctx.clone())
157285
.subscriber("banana/notifications", async |ctx, sample| {
158286
let msg: Message = serde_json::from_slice(&sample.payload().to_bytes())?;
159287
ctx.received.lock().await.push(msg);
288+
160289
Ok(())
161290
})
162291
.run()
163292
.await?;
164293

165-
// Publish messages
166294
sender
167295
.publisher("notifications")?
168-
.put(serde_json::to_vec(&Message::new("hello"))?)
296+
.put(serde_json::to_vec(&Message {
297+
text: "hello".to_string(),
298+
})?)
169299
.await?;
170300

171301
Ok(())

zenorb/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod zoci;
2+
13
mod receiver;
24
mod sender;
35
mod session;

0 commit comments

Comments
 (0)