Skip to content

Commit 2016ea5

Browse files
fix: gracefully close message store on corrupt segments (#1710)
## Summary - Skip async replicator close path when no followers are connected (startup), eliminating the race condition where a spawned fiber closes MFiles out from under the constructor - Skip post-load processing (`load_stats_from_segments`, `delete_unused_segments`, etc.) when the store was closed during `load_segments_from_disk` - Break out of `load_stats_from_segments` loop after `produce_metadata` calls `close`, preventing access to already-closed MFiles Fixes #1701
1 parent 705775f commit 2016ea5

2 files changed

Lines changed: 65 additions & 6 deletions

File tree

spec/message_store_spec.cr

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,57 @@ describe LavinMQ::MessageStore do
231231
end
232232
end
233233
end
234+
235+
it "closes gracefully when segment has corrupt schema version with replicator" do
236+
with_etcd do
237+
mktmpdir do |dir|
238+
# Create a valid store with a message, then close it
239+
store = LavinMQ::MessageStore.new(dir, nil)
240+
store.push(LavinMQ::Message.new("ex", "rk", "body"))
241+
store.close
242+
243+
# Corrupt the schema version at the start of the segment file
244+
seg_file = Dir.children(dir).find!(&.starts_with?("msgs."))
245+
File.open(File.join(dir, seg_file), "r+") { |f| f.write("abcd".to_slice) }
246+
247+
# With a replicator (no followers), close spawns a fiber that races
248+
# with the constructor — this should close gracefully, not crash
249+
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
250+
begin
251+
store = LavinMQ::MessageStore.new(dir, replicator)
252+
store.closed.should be_true
253+
ensure
254+
replicator.close
255+
end
256+
end
257+
end
258+
end
259+
260+
it "closes gracefully when a middle segment is corrupt with replicator" do
261+
with_etcd do
262+
mktmpdir do |dir|
263+
# Create a store with multiple segments (one message per segment)
264+
store = LavinMQ::MessageStore.new(dir, nil)
265+
msg_size = LavinMQ::Config.instance.segment_size.to_u64 - (LavinMQ::BytesMessage::MIN_BYTESIZE + 5)
266+
msg = LavinMQ::Message.new(RoughTime.unix_ms, "e", "k", AMQ::Protocol::Properties.new, msg_size, IO::Memory.new("a" * msg_size))
267+
3.times { store.push(msg) }
268+
store.@segments.size.should eq 3
269+
store.close
270+
271+
# Corrupt the schema version of the second segment
272+
seg_files = Dir.children(dir).select(&.starts_with?("msgs.")).sort!
273+
File.open(File.join(dir, seg_files[1]), "r+") { |f| f.write("abcd".to_slice) }
274+
275+
# With a replicator (no followers), this should close gracefully
276+
# even though valid segments before the corrupt one were already loaded
277+
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
278+
begin
279+
store = LavinMQ::MessageStore.new(dir, replicator)
280+
store.closed.should be_true
281+
ensure
282+
replicator.close
283+
end
284+
end
285+
end
286+
end
234287
end

src/lavinmq/message_store.cr

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@ module LavinMQ
3333
@durable = durable
3434
@acks = Hash(UInt32, MFile).new { |acks, seg| acks[seg] = open_ack_file(seg) }
3535
load_segments_from_disk
36-
delete_orphan_ack_files
37-
load_deleted_from_disk
38-
load_stats_from_segments
39-
delete_unused_segments
36+
unless @closed
37+
delete_orphan_ack_files
38+
load_deleted_from_disk
39+
load_stats_from_segments
40+
delete_unused_segments
41+
end
42+
4043
@wfile_id = @segments.last_key
4144
@wfile = @segments.last_value
4245
@rfile_id = @segments.first_key
4346
@rfile = @segments.first_value
44-
@empty.set empty?
47+
@empty.set empty? unless @closed
4548
end
4649

4750
def push(msg) : SegmentPosition
@@ -260,7 +263,7 @@ module LavinMQ
260263
@empty.close
261264
# To make sure that all replication actions for the segments
262265
# have finished wait for a delete action of a nonexistent file
263-
if replicator = @replicator
266+
if (replicator = @replicator) && !replicator.all_followers.empty?
264267
wg = WaitGroup.new
265268
replicator.delete_file(File.join(@msg_dir, "nonexistent"), wg)
266269
spawn(name: "wait for file deletion is replicated") do
@@ -424,7 +427,9 @@ module LavinMQ
424427
end
425428
rescue ex
426429
@log.error { "Could not initialize segment #{seg}, closing message store: #{ex.message}" }
430+
@segments[seg] = file
427431
close
432+
break
428433
end
429434
end
430435
file.pos = 4
@@ -447,6 +452,7 @@ module LavinMQ
447452
read_metadata_file(seg, mfile)
448453
rescue File::NotFoundError | MetadataError
449454
produce_metadata(seg, mfile)
455+
break if @closed
450456
write_metadata_file(seg, mfile) unless seg == @segments.last_key # this segment is not full yet
451457
end
452458

0 commit comments

Comments
 (0)