Skip to content

Commit 9421a51

Browse files
committed
feat: Add MPSC channel support to zrtl_thread plugin
Add message passing channels for inter-thread communication: - $Channel$new - Create unbounded MPSC channel - $Channel$bounded - Create bounded channel with capacity - $Channel$send / $Channel$send_ptr - Send values through channel - $Channel$recv - Blocking receive - $Channel$try_recv / $Channel$try_recv_status - Non-blocking receive - $Channel$recv_timeout / $Channel$recv_timeout_status - Receive with timeout - $Channel$clone_sender - Clone sender for multiple producers - $Channel$close_sender / $Channel$close_receiver - Close channel ends Includes comprehensive tests for all channel operations.
1 parent 75a6f4d commit 9421a51

2 files changed

Lines changed: 597 additions & 6 deletions

File tree

plugins/zrtl_thread/README.md

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# zrtl_thread
22

3-
Threading and synchronization primitives for Zyntax-based languages.
3+
Threading, synchronization, and message passing for Zyntax-based languages.
44

55
## Overview
66

7-
Provides thread spawning, atomic operations, and mutex support. Supports both function pointers and ZRTL closures for thread entry points.
7+
Provides thread spawning, atomic operations, mutex support, and MPSC (multi-producer, single-consumer) channels for inter-thread communication. Supports both function pointers and ZRTL closures for thread entry points.
88

99
## Exported Symbols
1010

@@ -47,7 +47,45 @@ Provides thread spawning, atomic operations, and mutex support. Supports both fu
4747
| `$Mutex$unlock` | `(u64) -> i32` | Unlock mutex |
4848
| `$Mutex$free` | `(u64) -> ()` | Free a mutex |
4949

50-
## Usage Example
50+
### Channel Operations (MPSC)
51+
52+
| Symbol | Signature | Description |
53+
|--------|-----------|-------------|
54+
| `$Channel$new` | `() -> ChannelPair` | Create unbounded channel |
55+
| `$Channel$bounded` | `(u64) -> ChannelPair` | Create bounded channel with capacity |
56+
| `$Channel$send` | `(u64, i64) -> i32` | Send value (0=ok, -1=disconnected) |
57+
| `$Channel$send_ptr` | `(u64, *u8) -> i32` | Send pointer as i64 |
58+
| `$Channel$recv` | `(u64) -> i64` | Receive (blocking) |
59+
| `$Channel$try_recv` | `(u64) -> i64` | Try receive (non-blocking) |
60+
| `$Channel$try_recv_status` | `(u64) -> TryRecvResult` | Try receive with status |
61+
| `$Channel$recv_timeout` | `(u64, u64) -> i64` | Receive with timeout (ms) |
62+
| `$Channel$recv_timeout_status` | `(u64, u64) -> RecvTimeoutResult` | Receive with timeout and status |
63+
| `$Channel$clone_sender` | `(u64) -> u64` | Clone sender for multiple producers |
64+
| `$Channel$close_sender` | `(u64) -> ()` | Close a sender |
65+
| `$Channel$close_receiver` | `(u64) -> ()` | Close the receiver |
66+
67+
#### Channel Types
68+
69+
```c
70+
struct ChannelPair {
71+
u64 sender; // Sender handle
72+
u64 receiver; // Receiver handle
73+
}
74+
75+
struct TryRecvResult {
76+
i64 value; // Received value (valid if status == 0)
77+
i32 status; // 0 = success, 1 = empty, -1 = disconnected
78+
}
79+
80+
struct RecvTimeoutResult {
81+
i64 value; // Received value (valid if status == 0)
82+
i32 status; // 0 = success, 1 = timeout, -1 = disconnected
83+
}
84+
```
85+
86+
## Usage Examples
87+
88+
### Basic Threading
5189

5290
```zig
5391
// Spawn a thread with function pointer
@@ -58,6 +96,13 @@ extern fn worker(arg: i64) i64 {
5896
const handle = $Thread$spawn(worker, 21);
5997
const result = $Thread$join(handle); // result = 42
6098
99+
// Get CPU cores for work distribution
100+
const cores = $Thread$available_parallelism();
101+
```
102+
103+
### Atomics
104+
105+
```zig
61106
// Using atomics for thread-safe counter
62107
const counter = $Atomic$new(0);
63108
@@ -67,7 +112,11 @@ $Atomic$add(counter, 1);
67112
// Read final value
68113
const count = $Atomic$load(counter);
69114
$Atomic$free(counter);
115+
```
70116

117+
### Mutex
118+
119+
```zig
71120
// Using mutex for critical sections
72121
const mutex = $Mutex$new();
73122
@@ -76,9 +125,76 @@ $Mutex$lock(mutex);
76125
$Mutex$unlock(mutex);
77126
78127
$Mutex$free(mutex);
128+
```
79129

80-
// Get CPU cores for work distribution
81-
const cores = $Thread$available_parallelism();
130+
### Channels (Message Passing)
131+
132+
```zig
133+
// Create a channel
134+
const pair = $Channel$new();
135+
const sender = pair.sender;
136+
const receiver = pair.receiver;
137+
138+
// Producer thread
139+
extern fn producer(tx: i64) i64 {
140+
for (var i = 0; i < 10; i += 1) {
141+
$Channel$send(tx, i);
142+
}
143+
$Channel$close_sender(tx);
144+
return 10;
145+
}
146+
147+
// Spawn producer
148+
const prod = $Thread$spawn(producer, sender);
149+
150+
// Receive messages in main thread
151+
while (true) {
152+
const result = $Channel$try_recv_status(receiver);
153+
if (result.status == 0) {
154+
// Got value: result.value
155+
process(result.value);
156+
} else if (result.status == -1) {
157+
// Channel disconnected, all done
158+
break;
159+
}
160+
// status == 1 means empty, keep trying
161+
}
162+
163+
$Thread$join(prod);
164+
$Channel$close_receiver(receiver);
165+
```
166+
167+
### Multiple Producers
168+
169+
```zig
170+
const pair = $Channel$new();
171+
const receiver = pair.receiver;
172+
173+
// Clone sender for each producer
174+
const sender1 = pair.sender;
175+
const sender2 = $Channel$clone_sender(sender1);
176+
const sender3 = $Channel$clone_sender(sender1);
177+
178+
// Spawn multiple producers
179+
const t1 = $Thread$spawn(producer_fn, sender1);
180+
const t2 = $Thread$spawn(producer_fn, sender2);
181+
const t3 = $Thread$spawn(producer_fn, sender3);
182+
183+
// Receive from all producers
184+
while (true) {
185+
const result = $Channel$recv_timeout_status(receiver, 100);
186+
if (result.status == 0) {
187+
// Process message
188+
} else if (result.status == -1) {
189+
break; // All senders closed
190+
}
191+
}
192+
193+
// Wait for all producers
194+
$Thread$join(t1);
195+
$Thread$join(t2);
196+
$Thread$join(t3);
197+
$Channel$close_receiver(receiver);
82198
```
83199

84200
## Thread Types
@@ -96,6 +212,14 @@ Use `ZrtlClosure` for closures with captured state. The `spawn_closure` variants
96212
97213
All atomic operations use `SeqCst` (sequential consistency) ordering for maximum safety. This provides strong guarantees but may have some performance overhead compared to weaker orderings.
98214
215+
## Channel Semantics
216+
217+
- **MPSC**: Multiple senders, single receiver
218+
- **Unbounded**: `$Channel$new` creates a channel with unlimited buffer
219+
- **Bounded**: `$Channel$bounded(n)` blocks senders when n messages are queued
220+
- **Disconnection**: Closing all senders signals EOF to receiver
221+
- **Ordering**: Messages are received in FIFO order per sender
222+
99223
## Dependencies
100224
101225
- `zrtl` - Core ZRTL SDK (ZrtlClosure, DynamicBox)

0 commit comments

Comments
 (0)