Skip to content

Commit 6cad6c2

Browse files
Fix: reset pos when crossing segment boundary in find_offset_in_segments (#1772)
### WHAT is this pull request doing? Fixes #1771 Restores the `pos = 4u32` reset in `find_offset_in_segments` when advancing to the next segment. In #1479, the original `pos = 4_u32` was changed to `rfile.pos = 4u32`. During [review](#1479 (comment)) it was mentioned that the method uses a local `pos` variable, not `rfile.pos`. When resolving, the line was removed entirely instead of being changed back to `pos = 4u32`. Without the reset, when a timestamp-based `x-stream-offset` falls between two segments, the method walks all messages in the first segment, crosses to the next segment, and reads from the old segment's end position, causing an `IndexError` or potentially a SEGFAULT. ### HOW can this pull request be tested? Run added spec --------- Co-authored-by: Viktor Erlingsson <[email protected]>
1 parent 7c88f06 commit 6cad6c2

2 files changed

Lines changed: 27 additions & 0 deletions

File tree

spec/stream_queue_spec.cr

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,32 @@ describe LavinMQ::AMQP::Stream do
221221
end
222222
end
223223

224+
it "consume from timestamp offset across segment boundary" do
225+
with_amqp_server do |s|
226+
with_channel(s) do |ch|
227+
q = ch.queue("stream-ts-across-segments", args: stream_queue_args)
228+
# Use half-segment messages so exactly 1 fits per segment
229+
data = Bytes.new(LavinMQ::Config.instance.segment_size // 2)
230+
# Fill segment 1 — two half-segment messages won't fit, so second triggers new segment
231+
q.publish_confirm data
232+
# Sleep to create a timestamp gap
233+
sleep 1.seconds
234+
target_time = Time.utc
235+
# This publish can't fit in current segment, creates a new one with first_ts > target
236+
q.publish_confirm data
237+
# Consume from timestamp in the gap — find_offset_in_segments must cross segment boundary
238+
ch.prefetch 1
239+
msgs = Channel(AMQP::Client::DeliverMessage).new
240+
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": target_time})) do |msg|
241+
msgs.send msg
242+
msg.ack
243+
end
244+
msg = msgs.receive
245+
StreamSpecHelpers.offset_from_headers(msg.properties.headers).should eq 2
246+
end
247+
end
248+
end
249+
224250
describe "Expiration" do
225251
it "segments should be removed if max-length set" do
226252
with_amqp_server do |s|

src/lavinmq/amqp/stream/stream_message_store.cr

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ module LavinMQ::AMQP
105105
if rfile.nil? || pos == rfile.size
106106
if segment = @segments.each_key.find { |sid| sid > segment }
107107
rfile = @segments[segment]
108+
pos = 4u32
108109
msg_offset = @segment_first_offset[segment]
109110
else
110111
return last_offset_seg_pos

0 commit comments

Comments
 (0)