Skip to content

Commit 5cd13d6

Browse files
spuunsnichme
andauthored
Bugfix: dead letter cycle detection (#1723)
Fixes a bug in dead letter cycle detection where cycles were only checked against the source queue rather than against each destination queue individually. This could cause infinite loops when a dead letter exchange routes to multiple queues. The previous implementation checked for cycles once before routing to any destination queues. It compared the source queue name against the x-death header to detect cycles. However, when a dead letter exchange routes to multiple queues, this approach was insufficient — each destination queue needs to be checked. Co-authored-by: Magnus Landerblom <[email protected]>
1 parent ab0aeaf commit 5cd13d6

2 files changed

Lines changed: 72 additions & 6 deletions

File tree

spec/queue_dead_lettering_spec.cr

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,74 @@ module DeadLetteringSpec
545545
msg.body_io.to_s.should eq "msg1"
546546
end
547547
end
548+
549+
it "should detect cycle with two queues in chain" do
550+
with_amqp_server do |s|
551+
with_channel(s) do |ch|
552+
qargs1 = {
553+
"x-max-length" => 1,
554+
"x-dead-letter-exchange" => "",
555+
"x-dead-letter-routing-key" => "q2",
556+
}
557+
q1 = ch.queue("q1", args: AMQP::Client::Arguments.new(qargs1))
558+
qargs2 = {
559+
"x-max-length" => 1,
560+
"x-dead-letter-exchange" => "",
561+
"x-dead-letter-routing-key" => "q1",
562+
}
563+
q2 = ch.queue("q2", args: AMQP::Client::Arguments.new(qargs2))
564+
# "fill" queues
565+
ch.default_exchange.publish("m1", "q1")
566+
ch.default_exchange.publish("m2", "q2")
567+
# now overflow q1
568+
ch.default_exchange.publish("m3", "q1")
569+
# m1 should be dead lettered to q2, which in turn is overflowed so
570+
# m2 is dead letterd to q1. q1 overflowed again and m3 is dead lettered
571+
# to q2. When q2 now overflows m1 shouldn't be dead lettered to q1,
572+
# but instead be dropped.
573+
574+
get1(q1).body_io.to_s.should eq "m2"
575+
get1(q2).body_io.to_s.should eq "m3"
576+
end
577+
end
578+
end
579+
580+
it "should detect cycle with three queues in chain" do
581+
with_amqp_server do |s|
582+
with_channel(s) do |ch|
583+
qargs1 = {
584+
"x-max-length" => 1,
585+
"x-dead-letter-exchange" => "",
586+
"x-dead-letter-routing-key" => "q2",
587+
}
588+
q1 = ch.queue("q1", args: AMQP::Client::Arguments.new(qargs1))
589+
qargs2 = {
590+
"x-max-length" => 1,
591+
"x-dead-letter-exchange" => "",
592+
"x-dead-letter-routing-key" => "q3",
593+
}
594+
q2 = ch.queue("q2", args: AMQP::Client::Arguments.new(qargs2))
595+
qargs3 = {
596+
"x-max-length" => 1,
597+
"x-dead-letter-exchange" => "",
598+
"x-dead-letter-routing-key" => "q1",
599+
}
600+
q3 = ch.queue("q3", args: AMQP::Client::Arguments.new(qargs3))
601+
# "fill" queues
602+
ch.default_exchange.publish("m1", "q1")
603+
ch.default_exchange.publish("m2", "q2")
604+
ch.default_exchange.publish("m3", "q3")
605+
# now overflow q1
606+
ch.default_exchange.publish("m4", "q1")
607+
608+
# messages should end up in the queue before the queue they were
609+
# published to
610+
get1(q1).body_io.to_s.should eq "m2"
611+
get1(q2).body_io.to_s.should eq "m3"
612+
get1(q3).body_io.to_s.should eq "m4"
613+
end
614+
end
615+
end
548616
end
549617

550618
describe "Header Validation" do

src/lavinmq/amqp/argument/dead_lettering.cr

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ module LavinMQ::AMQP
1919
def initialize(@vhost : VHost, @queue_name : String, @log : Logger)
2020
end
2121

22-
private def cycle?(props, reason) : Bool
22+
private def cycle?(qname, props, reason) : Bool
2323
unless headers = props.headers
2424
return false
2525
end
@@ -33,7 +33,7 @@ module LavinMQ::AMQP
3333
xdeaths.each do |field|
3434
next unless xdeath = field.as?(AMQ::Protocol::Table)
3535
return false if xdeath["reason"]? == "rejected"
36-
return true if xdeath["queue"]? == @queue_name && xdeath["reason"] == reason.to_s
36+
return true if xdeath["queue"]? == qname && xdeath["reason"] == reason.to_s
3737
end
3838
false
3939
end
@@ -42,7 +42,7 @@ module LavinMQ::AMQP
4242
# It's done like this to be able to dead letter to all destinations
4343
# except to the queue itself if a cycle is detected.
4444
# This is also how it's done in rabbitmq
45-
def route(msg : BytesMessage, reason) # ameba:disable Metrics/CyclomaticComplexity
45+
def route(msg : BytesMessage, reason)
4646
# No dead letter exchange => nothing to do
4747
return unless dlx = (msg.dlx || dlx())
4848
ex = @vhost.exchanges[dlx.to_s]?.as?(AMQP::Exchange) || return
@@ -71,14 +71,12 @@ module LavinMQ::AMQP
7171
ex.find_queues(routing_rk, routing_headers, queues)
7272
return if queues.empty?
7373

74-
is_cycle = cycle?(props, reason)
75-
7674
dead_lettered_msg = Message.new(
7775
RoughTime.unix_ms, dlx.to_s, routing_rk.to_s,
7876
props, msg.bodysize, IO::Memory.new(msg.body))
7977

8078
queues.each do |q|
81-
next if is_cycle && q.name == @queue_name
79+
next if cycle?(q.name, props, reason)
8280
@log.trace { "dead lettering dest=#{q.name} msg=#{dead_lettered_msg}" }
8381
q.publish(dead_lettered_msg)
8482
rescue ex

0 commit comments

Comments
 (0)