Skip to content

Commit ab0aeaf

Browse files
fix: prevent consumer starvation in lavinmqperf throughput (#1712)
## Summary - Add `Fiber.yield` every 128k publishes when no rate limit is set, preventing the publisher from monopolizing a shared thread and starving consumer fibers - Fix ack/tx_commit to use per-consumer local counters instead of the global atomic counter, preventing deadlocks with multiple consumers and prefetch - Fix `RateLimiter` to use its local counter for yield frequency (was using global counter or stuck at 0) ## Context When running `lavinmqperf throughput` on machines with few CPU cores, `Fiber::ExecutionContext::Parallel` has limited threads. The publisher and consumer fibers (plus AMQP client internal fibers) can share a thread. The publisher's tight loop with zero yielding monopolized the thread, causing the consumer to stop reading from its socket. The server's TCP send buffer filled up and delivery stalled permanently. Closes #1711 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Carl Hörberg <[email protected]>
1 parent 2aeac2e commit ab0aeaf

1 file changed

Lines changed: 17 additions & 11 deletions

File tree

src/lavinmqperf/amqp/throughput.cr

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ module LavinMQPerf
354354
pubs_this_second = 0
355355
queues = queue_names
356356
queue_idx = 0
357+
local_pubs = 0_u64
357358
until @stopped
358359
if @measure_latency
359360
# Write timestamp at the beginning of the message
@@ -383,8 +384,10 @@ module LavinMQPerf
383384
end
384385
queue_idx += 1 if @queue_pattern
385386
pubs = @pubs.add(1, :relaxed) + 1
386-
ch.tx_commit if @pub_in_transaction > 0 && (pubs % @pub_in_transaction) == 0
387+
local_pubs &+= 1
388+
ch.tx_commit if @pub_in_transaction > 0 && (local_pubs % @pub_in_transaction) == 0
387389
break if pubs >= @pmessages > 0
390+
Fiber.yield if @rate.zero? && local_pubs % (128*1024) == 0
388391
unless @rate.zero?
389392
pubs_this_second += 1
390393
if pubs_this_second >= @rate
@@ -418,23 +421,25 @@ module LavinMQPerf
418421
q.bind(@exchange, @routing_key) unless @exchange.empty?
419422
wait_until_all_are_connected(connected)
420423
rate_limiter = RateLimiter.new(@consume_rate)
424+
local_consumes = 0_u64
421425
q.subscribe(tag: "c", no_ack: @ack.zero?, block: true, args: @consumer_args) do |m|
422-
handle_consumed_message(ch, m, data, rate_limiter)
426+
local_consumes &+= 1
427+
handle_consumed_message(ch, m, data, rate_limiter, local_consumes)
423428
end
424429
end
425430
end
426431

427-
private def handle_consumed_message(ch, m, data, rate_limiter)
432+
private def handle_consumed_message(ch, m, data, rate_limiter, local_consumes : UInt64)
428433
consumes = @consumes.add(1, :relaxed) + 1
429434
body = m.body_io.to_slice
430435
record_latency(body) if @measure_latency
431436
verify_message_body(body, data) if @verify
432-
m.ack(multiple: true) if @ack > 0 && consumes % @ack == 0
433-
ch.tx_commit if @ack_in_transaction > 0 && (consumes % @ack_in_transaction) == 0
437+
m.ack(multiple: true) if @ack > 0 && local_consumes % @ack == 0
438+
ch.tx_commit if @ack_in_transaction > 0 && (local_consumes % @ack_in_transaction) == 0
434439
if @stopped || consumes >= @cmessages > 0
435440
ch.close
436441
end
437-
rate_limiter.limit(consumes)
442+
rate_limiter.limit
438443
end
439444

440445
private def poll_consume(connected, queue_name : String)
@@ -453,11 +458,13 @@ module LavinMQPerf
453458
q.bind(@exchange, @routing_key) unless @exchange.empty?
454459
wait_until_all_are_connected(connected)
455460
rate_limiter = RateLimiter.new(@consume_rate)
461+
local_consumes = 0_u64
456462
loop do
457463
if msg = q.get(no_ack: @ack.zero?)
458464
consumes = @consumes.add(1, :relaxed) + 1
465+
local_consumes &+= 1
459466
record_latency(msg.body_io.to_slice) if @measure_latency
460-
msg.ack(multiple: true) if @ack > 0 && consumes % @ack == 0
467+
msg.ack(multiple: true) if @ack > 0 && local_consumes % @ack == 0
461468
break if @stopped || consumes >= @cmessages > 0
462469
rate_limiter.limit
463470
end
@@ -492,16 +499,15 @@ module LavinMQPerf
492499
@total_consumes = 0_u64
493500
end
494501

495-
def limit(consume_count : UInt64? = nil)
502+
def limit
503+
@total_consumes &+= 1
496504
if @rate.zero?
497505
# When no rate limiting, yield periodically to avoid blocking other fibers
498-
count = consume_count || @total_consumes
499-
Fiber.yield if count % (128*1024) == 0
506+
Fiber.yield if @total_consumes % (128*1024) == 0
500507
return
501508
end
502509

503510
@consumes_this_second += 1
504-
@total_consumes += 1
505511
if @consumes_this_second >= @rate
506512
until_next_second = (@start + 1.seconds) - Time.instant
507513
sleep until_next_second if until_next_second > Time::Span.zero

0 commit comments

Comments
 (0)