Skip to content

Commit 1039f19

Browse files
authored
Bugfix: dont expire messages before server is fully started (#1714)
### WHAT is this pull request doing? Make message expire fibers wait for vhost to be fully loaded before expiring messages. Fixes #1697 ### HOW can this pull request be tested? Specs, but manually never hurts
1 parent 2016ea5 commit 1039f19

6 files changed

Lines changed: 75 additions & 19 deletions

File tree

spec/queue_spec.cr

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,34 @@ def with_queue(&)
1616
end
1717

1818
describe LavinMQ::AMQP::Queue do
19+
it "should not expire message before server is fully started" do
20+
# https://github.com/cloudamqp/lavinmq/issues/1697
21+
with_amqp_server do |s|
22+
with_channel(s) do |ch|
23+
# Must go via a queue with binding keys
24+
dlq = ch.queue("dlq")
25+
dlq.bind(ch.topic_exchange.name, dlq.name)
26+
q = ch.queue("ttl", args: AMQP::Client::Arguments.new(
27+
{"x-message-ttl" => 1000, "x-dead-letter-exchange" => "amq.topic", "x-dead-letter-routing-key" => dlq.name}
28+
))
29+
# declare a queue after ttl that will cause yields in its message store during load
30+
other_queue = ch.queue("other")
31+
x = ch.default_exchange
32+
x.publish_confirm("foo", other_queue.name)
33+
x.publish_confirm("ttl", q.name)
34+
end
35+
s.stop
36+
RoughTime.paused do |t|
37+
# Move time so message will be expired on startup
38+
t.travel 2.seconds
39+
s.restart
40+
with_channel(s) do |ch|
41+
ch.queue("dlq").get.should_not be_nil, failure_message: "Message not dead lettered?!"
42+
end
43+
end
44+
end
45+
end
46+
1947
it "should expire itself after last consumer disconnects" do
2048
with_amqp_server do |s|
2149
with_channel(s) do |ch|

src/lavinmq/amqp/queue/queue.cr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ module LavinMQ::AMQP
9292
getter? internal = false
9393

9494
private def queue_expire_loop
95+
@vhost.closed.when_false.receive?
9596
loop do
9697
break unless @expires
9798
@consumers_empty.when_true.receive
@@ -110,6 +111,7 @@ module LavinMQ::AMQP
110111
end
111112

112113
private def message_expire_loop
114+
@vhost.closed.when_false.receive?
113115
loop do
114116
@consumers_empty.when_true.receive
115117
@log.debug { "Consumers empty" }

src/lavinmq/bool_channel.cr

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,22 @@ class BoolChannel
4848
end
4949

5050
def value
51-
@value.get
51+
@value.get(:acquire)
5252
end
5353

54-
def set(value : Bool)
55-
return if @value.swap(value) == value
54+
def swap(value : Bool)
55+
ret = @value.swap(value)
56+
update_active_channel(value) unless ret == value
57+
ret
58+
end
59+
60+
def set(value : Bool) : Nil
61+
@value.set(value)
62+
update_active_channel(value)
63+
nil
64+
end
65+
66+
private def update_active_channel(value)
5667
if value
5768
@when_false.deactivate
5869
@when_true.activate

src/lavinmq/server.cr

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ module LavinMQ
2929
end
3030

3131
getter vhosts, users, data_dir, parameters, authenticator
32-
getter? closed, flow
32+
getter? flow
3333
include ParameterTarget
3434

3535
@start = Time.instant
36-
@closed = false
36+
@closed = BoolChannel.new(false)
3737
@flow = true
3838
@listeners = Hash(Socket::Server, Protocol).new # Socket => protocol
3939
@connection_factories = Hash(Protocol, ConnectionFactory).new
@@ -46,6 +46,7 @@ module LavinMQ
4646
Schema.migrate(@data_dir, @replicator)
4747
@users = Auth::UserStore.new(@data_dir, @replicator)
4848
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
49+
@vhosts.load!
4950
@mqtt_brokers = MQTT::Brokers.new(@vhosts, @replicator)
5051
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
5152
@authenticator = Auth::Chain.create(@config, @users)
@@ -57,6 +58,10 @@ module LavinMQ
5758
spawn stats_loop, name: "Server#stats_loop"
5859
end
5960

61+
def closed?
62+
@closed.value
63+
end
64+
6065
def followers
6166
@replicator.try(&.followers) || Array(Clustering::Follower).new
6267
end
@@ -80,8 +85,7 @@ module LavinMQ
8085
end
8186

8287
def stop
83-
return if @closed
84-
@closed = true
88+
return if @closed.swap(true)
8589
@vhosts.close
8690
@replicator.try &.clear
8791
@authenticator.try &.cleanup
@@ -95,11 +99,12 @@ module LavinMQ
9599
@users = Auth::UserStore.new(@data_dir, @replicator)
96100
@authenticator = Auth::Chain.create(@config, @users)
97101
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
102+
@vhosts.load!
98103
@connection_factories[Protocol::AMQP] = AMQP::ConnectionFactory.new(@authenticator, @vhosts)
99104
@connection_factories[Protocol::MQTT] = MQTT::ConnectionFactory.new(@authenticator, @mqtt_brokers, @config)
100105
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
101106
apply_parameter
102-
@closed = false
107+
@closed.set(false)
103108
Fiber.yield
104109
end
105110

@@ -112,7 +117,7 @@ module LavinMQ
112117
Log.info { "Listening for #{protocol} on #{s.local_address}" }
113118
loop do
114119
client = s.accept? || break
115-
next client.close if @closed
120+
next client.close if closed?
116121
accept_tcp(client, protocol)
117122
end
118123
rescue ex : IO::Error
@@ -162,7 +167,7 @@ module LavinMQ
162167
Log.info { "Listening for #{protocol} on #{s.local_address}" }
163168
loop do # do not try to use while
164169
client = s.accept? || break
165-
next client.close if @closed
170+
next client.close if closed?
166171
accept_unix(client, protocol)
167172
end
168173
rescue ex : IO::Error
@@ -198,7 +203,7 @@ module LavinMQ
198203
Log.info { "Listening for #{protocol} on #{s.local_address} (TLS)" }
199204
loop do # do not try to use while
200205
client = s.accept? || break
201-
next client.close if @closed
206+
next client.close if closed?
202207
accept_tls(client, context, protocol)
203208
end
204209
rescue ex : IO::Error | OpenSSL::Error
@@ -255,7 +260,7 @@ module LavinMQ
255260
end
256261

257262
def close
258-
@closed = true
263+
@closed.set(true)
259264
Log.debug { "Closing listeners" }
260265
@listeners.each_key &.close
261266
Log.debug { "Closing vhosts" }

src/lavinmq/vhost.cr

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ module LavinMQ
2828
getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
2929
direct_reply_consumers, connections, dir, users
3030
property? flow = true
31-
getter? closed = false
31+
getter closed = BoolChannel.new(true)
32+
33+
def closed?
34+
@closed.value
35+
end
36+
3237
property max_connections : Int32?
3338
property max_queues : Int32?
3439

@@ -66,8 +71,11 @@ module LavinMQ
6671

6772
private def check_consumer_timeouts_loop
6873
loop do
69-
sleep Config.instance.consumer_timeout_loop_interval.seconds
70-
return if @closed
74+
select
75+
when timeout Config.instance.consumer_timeout_loop_interval.seconds
76+
when closed.when_true.receive?
77+
return
78+
end
7179
@connections.each do |c|
7280
c.channels.each_value do |ch|
7381
ch.check_consumer_timeout
@@ -401,7 +409,7 @@ module LavinMQ
401409
end
402410

403411
def close(reason = "Broker shutdown")
404-
@closed = true
412+
return if @closed.swap(true)
405413
stop_shovels
406414
stop_upstream_links
407415

@@ -475,11 +483,12 @@ module LavinMQ
475483
if has_parameters || has_policies
476484
spawn(name: "Load parameters") do
477485
sleep 10.milliseconds
478-
next if @closed
486+
next if closed?
479487
apply_parameters
480488
apply_policies
481489
end
482490
end
491+
closed.set(false)
483492
Fiber.yield
484493
end
485494

src/lavinmq/vhost_store.cr

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ module LavinMQ
1717

1818
def initialize(@data_dir : String, @users : Auth::UserStore, @replicator : Clustering::Replicator?)
1919
@vhosts = Hash(String, VHost).new
20-
load!
2120
end
2221

2322
forward_missing_to @vhosts
@@ -72,7 +71,7 @@ module LavinMQ
7271
end
7372
end
7473

75-
private def load!
74+
def load!
7675
path = File.join(@data_dir, "vhosts.json")
7776
if File.exists? path
7877
Log.debug { "Loading vhosts from file" }
@@ -90,6 +89,8 @@ module LavinMQ
9089
Log.debug { "Loading default vhosts" }
9190
create("/")
9291
end
92+
# Wait for vhosts to be loaded ("not closed")
93+
@vhosts.each_value &.closed.when_false.receive
9394
Log.debug { "#{size} vhosts loaded" }
9495
rescue ex
9596
Log.error(exception: ex) { "Failed to load vhosts" }

0 commit comments

Comments
 (0)