Skip to content

Commit 46e8bb9

Browse files
authored
fix: condvar not properly removing handles (#384)
fix #315
1 parent 968a4b9 commit 46e8bb9

3 files changed

Lines changed: 154 additions & 2 deletions

File tree

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,52 @@ end
9797

9898
Please use `plenary.async` instead. This was version 1 and is just here for compatibility reasons.
9999

100+
### plenary.async.control.channel.oneshot
101+
102+
Creates a oneshot channel. It can only send data one time.
103+
104+
The sender is not async while the receiver is.
105+
106+
Example:
107+
108+
```lua
109+
local a = require'plenary.async'
110+
local tx, rx = a.control.channel.oneshot()
111+
112+
a.run(function()
113+
local ret = long_running_fn()
114+
tx(ret)
115+
end)
116+
117+
local ret = rx()
118+
```
119+
120+
### plenary.async.control.channel.mpsc
121+
122+
Creates a multiple producer single consumer channel.
123+
124+
Example:
125+
126+
```lua
127+
local a = require'plenary.async'
128+
local sender, receiver = a.control.channel.mpsc()
129+
130+
a.run(function()
131+
sender.send(10)
132+
sender.send(20)
133+
end)
134+
135+
a.run(function()
136+
sender.send(30)
137+
sender.send(40)
138+
end)
139+
140+
for _ = 1, 4 do
141+
local value = receiver.recv()
142+
print('received:', value)
143+
end
144+
```
145+
100146
### plenary.job
101147

102148
A Lua module to interact with system processes. Pass in your `command`, the desired `args`, `env` and `cwd`.

lua/plenary/async/control.lua

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,20 @@ end, 2)
2121

2222
---notify everyone that is waiting on this Condvar
2323
function Condvar:notify_all()
24+
local len = #self.handles
2425
for i, callback in ipairs(self.handles) do
26+
if i > len then
27+
-- this means that more handles were added while we were notifying
28+
-- if we don't break we can get starvation notifying as soon as new handles are added
29+
break
30+
end
31+
2532
callback()
26-
self.handles[i] = nil
33+
end
34+
35+
for i = 1, len do
36+
-- table.remove will ensure that indexes are correct and make "ipairs" safe, which is not the case for "self.handles[i] = nil"
37+
table.remove(self.handles, i)
2738
end
2839
end
2940

@@ -207,7 +218,7 @@ M.channel.mpsc = function()
207218
if deque:is_empty() then
208219
condvar:wait()
209220
end
210-
local val = deque:popright()
221+
local val = deque:popleft()
211222
deque:clear()
212223
return unpack(val or {})
213224
end

tests/plenary/async/channel_spec.lua

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,101 @@ describe("channel", function()
8484
end)
8585
end)
8686

87+
describe("mpsc", function()
88+
a.it("should wait multiple recv before any send", function()
89+
local sender, receiver = channel.mpsc()
90+
91+
local expected_count = 10
92+
93+
a.run(function()
94+
for i = 1, expected_count do
95+
a.util.sleep(250)
96+
sender.send(i)
97+
end
98+
end)
99+
100+
local receive_count = 0
101+
while receive_count < expected_count do
102+
receive_count = receive_count + 1
103+
local i = receiver.recv()
104+
eq(receive_count, i)
105+
end
106+
end)
107+
108+
a.it("should queues multiple sends before any read", function()
109+
local sender, receiver = channel.mpsc()
110+
111+
local counter = 0
112+
113+
a.run(function()
114+
counter = counter + 1
115+
sender.send(10)
116+
117+
counter = counter + 1
118+
sender.send(20)
119+
end)
120+
121+
a.util.sleep(1000)
122+
123+
eq(10, receiver.recv())
124+
eq(20, receiver.recv())
125+
eq(2, counter)
126+
end)
127+
128+
a.it("should queues multiple sends from multiple producers before any read", function()
129+
local sender, receiver = channel.mpsc()
130+
131+
local counter = 0
132+
133+
a.run(function()
134+
counter = counter + 1
135+
sender.send(10)
136+
137+
counter = counter + 1
138+
sender.send(20)
139+
end)
140+
141+
a.run(function()
142+
counter = counter + 1
143+
sender.send(30)
144+
145+
counter = counter + 1
146+
sender.send(40)
147+
end)
148+
149+
a.util.sleep(1000)
150+
151+
local read_counter = 0
152+
a.util.block_on(function()
153+
for _ = 1, 4 do
154+
receiver.recv()
155+
read_counter = read_counter + 1
156+
end
157+
end, 1000)
158+
eq(4, counter)
159+
eq(4, read_counter)
160+
end)
161+
162+
a.it("should read only the last value", function()
163+
local sender, receiver = channel.mpsc()
164+
165+
local counter = 0
166+
167+
a.run(function()
168+
counter = counter + 1
169+
sender.send(10)
170+
171+
counter = counter + 1
172+
sender.send(20)
173+
end)
174+
175+
a.util.sleep(1000)
176+
177+
eq(20, receiver.last())
178+
eq(2, counter)
179+
end)
180+
end)
181+
87182
describe("counter", function()
88183
a.it("should work", function()
89184
local tx, rx = channel.counter()

0 commit comments

Comments
 (0)