Skip to content

Commit 78e87a1

Browse files
authored
Reapply policies after queue restart (#1737)
### WHAT is this pull request doing? Reapplies policies after restarting a closed queue. Previously, only queue arguments (e.g. `x-max-length`) were restored via `handle_arguments`, but policies set via the management API were not reapplied. Fixes #1736 ### HOW can this pull request be tested? Run the new spec: `make test SPEC=spec/queue_spec.cr`
1 parent c1d2618 commit 78e87a1

3 files changed

Lines changed: 30 additions & 1 deletion

File tree

spec/queue_spec.cr

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,29 @@ describe LavinMQ::AMQP::Queue do
372372
end
373373
end
374374

375+
it "should reapply policies after restart" do
376+
with_amqp_server do |s|
377+
with_channel(s) do |ch|
378+
q = ch.queue(q_name, durable: true)
379+
queue = s.vhosts["/"].queues[q_name].as(LavinMQ::AMQP::DurableQueue)
380+
381+
# Apply a max-length policy
382+
definition = {"max-length" => JSON::Any.new(2i64)}
383+
policy = LavinMQ::Policy.new("test", "/", /.*/, LavinMQ::Policy::Target::Queues, definition, 0i8)
384+
queue.apply_policy(policy, nil)
385+
386+
# Close and restart the queue
387+
queue.close
388+
queue.closed?.should be_true
389+
queue.restart!.should be_true
390+
391+
# Publish 4 messages, only 2 should be kept
392+
4.times { |i| q.publish_confirm "msg #{i}" }
393+
queue.message_count.should eq 2
394+
end
395+
end
396+
end
397+
375398
it "should not restart if queue is still running" do
376399
with_amqp_server do |s|
377400
with_channel(s) do |ch|

src/lavinmq/amqp/queue/queue.cr

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ module LavinMQ::AMQP
203203
reset_queue_state
204204
@msg_store = init_msg_store(@data_dir)
205205
@empty = @msg_store.empty
206-
start
206+
start.tap do |started|
207+
reapply_policy if started
208+
end
207209
end
208210

209211
private def reset_queue_state

src/lavinmq/policy.cr

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ module LavinMQ
77
getter operator_policy : OperatorPolicy?
88
getter effective_policy_args = Array(String).new
99

10+
def reapply_policy
11+
apply_policy(@policy, @operator_policy)
12+
end
13+
1014
def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?)
1115
clear_policy
1216
effective_policy_args = Array(String).new

0 commit comments

Comments
 (0)