Skip to content

Commit fe9d3d7

Browse files
Fix delayed message store crash on corrupt segment data (#1694)
## Summary - Fix crash on startup when delayed message store segment files are corrupt or truncated (e.g. after unclean shutdown) - Rewrite `build_index` to iterate segments directly with `IO::EOFError`/`IndexError` handling, matching how `produce_metadata` handles the same scenario in the base `MessageStore` - Add two specs covering truncated segments (with acked messages) and trailing corrupt data (with valid messages preserved) Closes #1693 ## Test plan - [x] Spec: rebuild index with truncated segment and acked messages - [x] Spec: rebuild index preserving valid messages despite trailing corrupt data - [ ] Run full test suite (`make test`) 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent 7eb0928 commit fe9d3d7

2 files changed

Lines changed: 70 additions & 4 deletions

File tree

spec/delayed_message_exchange_spec.cr

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,55 @@ describe "Delayed Message Exchange" do
6060
wait_for { s.vhosts["/"].queues[delay_q_name].message_count == 0 }
6161
end
6262
end
63+
64+
it "should rebuild index on restart with truncated segment and acked messages" do
65+
with_amqp_server do |s|
66+
with_channel(s) do |ch|
67+
x = ch.exchange(x_name, "topic", args: x_args)
68+
q = ch.queue("delayed_q")
69+
q.bind(x.name, "#")
70+
hdrs = AMQP::Client::Arguments.new({"x-delay" => 1})
71+
x.publish_confirm "test message", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
72+
wait_for { s.vhosts["/"].queues["delayed_q"].message_count == 1 }
73+
end
74+
# All messages in delayed queue are now expired and acked
75+
s.vhosts["/"].queues[delay_q_name].message_count.should eq 0
76+
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest(delay_q_name))
77+
78+
s.stop
79+
80+
# Truncate segment file mid-message, simulating a crash where
81+
# mmap data wasn't fully flushed to disk
82+
seg_file = File.join(data_dir, "msgs.0000000001")
83+
File.open(seg_file, "r+") { |f| f.truncate(f.size // 2) }
84+
85+
s.restart
86+
s.vhosts["/"].queues[delay_q_name].message_count.should eq 0
87+
end
88+
end
89+
90+
it "should rebuild index on restart preserving valid messages despite trailing corrupt data" do
91+
with_amqp_server do |s|
92+
with_channel(s) do |ch|
93+
x = ch.exchange(x_name, "topic", args: x_args)
94+
# No consumer bound, so message stays in the delayed queue
95+
hdrs = AMQP::Client::Arguments.new({"x-delay" => 300_000})
96+
x.publish_confirm "test message", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
97+
s.vhosts["/"].queues[delay_q_name].message_count.should eq 1
98+
end
99+
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest(delay_q_name))
100+
101+
s.stop
102+
103+
# Append a partial message (just a non-zero timestamp, no body),
104+
# simulating a crash mid-write
105+
seg_file = File.join(data_dir, "msgs.0000000001")
106+
File.open(seg_file, "a") { |f| f.write_bytes(1i64, IO::ByteFormat::SystemEndian) }
107+
108+
s.restart
109+
s.vhosts["/"].queues[delay_q_name].message_count.should eq 1
110+
end
111+
end
63112
end
64113

65114
q_name = "delayed_q"

src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.cr

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,27 @@ module LavinMQ::AMQP
3636
requeued = DelayedRequeuedStore.new
3737
count = 0u32
3838
bytesize = 0u64
39-
while env = shift?
40-
count += 1
41-
bytesize += env.message.bytesize
42-
requeued.insert(env.segment_position, env.message.timestamp)
39+
@segments.each do |seg_id, mfile|
40+
mfile.pos = 4
41+
loop do
42+
pos = mfile.pos.to_u32
43+
break if pos == mfile.size
44+
if deleted?(seg_id, pos)
45+
BytesMessage.skip(mfile)
46+
next
47+
end
48+
ts = IO::ByteFormat::SystemEndian.decode(Int64, mfile.to_slice(pos, 8))
49+
break if ts.zero? # This means that the rest of the file is zero, so break.
50+
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
51+
sp = SegmentPosition.make(seg_id, pos, msg)
52+
mfile.seek(sp.bytesize, IO::Seek::Current)
53+
count += 1
54+
bytesize += msg.bytesize
55+
requeued.insert(sp, msg.timestamp)
56+
rescue ex : IO::EOFError | IndexError
57+
@log.warn(exception: ex) { "Corrupt data at segment #{seg_id} pos #{pos}, skipping rest of segment" }
58+
break
59+
end
4360
end
4461
@requeued = requeued
4562
@size = count

0 commit comments

Comments
 (0)