Summary
Messages published to an x-delayed-message exchange with a large x-delay can be delivered almost immediately instead of being delayed.
I can reproduce this on LavinMQ 2.6.8 with a Spring AMQP producer.
Environment
- Broker: LavinMQ 2.6.8
- Producer: Java 17 + Spring Boot 3.5.9 + spring-rabbit 3.2.8
- Client: RabbitMQ Java client 5.25.0
- OS: macOS (arm64)
Reproduction
- Declare exchange type
x-delayed-message with x-delayed-type=direct.
- Bind a queue.
- Publish a message with header
x-delay=31536000000 (365 days in ms).
- Poll queue for 60s.
Reproduction Test Code (JUnit)
package com.lavinmq.springbootlavinmq;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class LavinmqLongDelayMessageTest {
private static final String HOST = System.getProperty("lavinmq.host", "127.0.0.1");
private static final int PORT = Integer.getInteger("lavinmq.port", 5672);
private static final String USERNAME = System.getProperty("lavinmq.username", "guest");
private static final String PASSWORD = System.getProperty("lavinmq.password", "guest");
private static final String EXCHANGE_NAME = "test-long-delay-exchange";
private static final String QUEUE_NAME = "test-long-delay-queue";
private static final String ROUTING_KEY = "test-long-delay-key";
private CachingConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;
private RabbitTemplate rabbitTemplate;
@BeforeEach
void setUp() {
connectionFactory = new CachingConnectionFactory(HOST, PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitTemplate = new RabbitTemplate(connectionFactory);
declareTopology();
}
@AfterEach
void tearDown() {
if (rabbitAdmin != null) {
rabbitAdmin.deleteQueue(QUEUE_NAME);
rabbitAdmin.deleteExchange(EXCHANGE_NAME);
}
if (connectionFactory != null) {
connectionFactory.destroy();
}
}
@Test
void shouldNotConsumeImmediatelyWhenDelayExceedsInt32() throws InterruptedException {
long delay = Duration.ofDays(365).toMillis();
long observeWindowMs = 60_000L;
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "long-delay-message", message -> {
message.getMessageProperties().setHeader("x-delay", delay);
return message;
});
long start = System.currentTimeMillis();
long deadline = start + observeWindowMs;
Object firstReceived = null;
Long firstReceivedAtMs = null;
while (System.currentTimeMillis() < deadline) {
Object message = rabbitTemplate.receiveAndConvert(QUEUE_NAME);
if (message != null && firstReceived == null) {
firstReceived = message;
firstReceivedAtMs = System.currentTimeMillis() - start;
System.out.printf(
"Consumed delayed message: delay=%dms, arrivedAt=%dms, payload=%s%n",
delay, firstReceivedAtMs, firstReceived
);
}
Thread.sleep(100L);
}
assertThat(firstReceived)
.as("Message with x-delay=%d should not be delivered within %d ms, but first arrived at %s ms",
delay, observeWindowMs, firstReceivedAtMs)
.isNull();
}
private void declareTopology() {
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put("x-delayed-type", "direct");
CustomExchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, exchangeArgs);
Queue queue = QueueBuilder.durable(QUEUE_NAME).build();
Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
rabbitAdmin.purgeQueue(QUEUE_NAME, true);
}
}
Actual Result
Message is consumed almost immediately. Example output:
Consumed delayed message: delay=31536000000ms, arrivedAt=3ms, payload=long-delay-message
Expected Result
The message should remain delayed (certainly not delivered within 60 seconds when delay is 365 days).
Source Analysis (possible root cause)
I looked at current main (commit 6cad6c2b499e3269489152b494a2cdbc83317645) and found:
-
Delay is stored as UInt32 in SegmentPosition:
|
getter delay : UInt32 # used by delayed exchange queue |
-
x-delay parsing in SegmentPosition.make converts with as?(Int).try(&.to_u32) and rescues to 0u32:
|
when "x-delay" then delay = value.as?(Int).try(&.to_u32) || 0u32 rescue 0u32 |
-
Expiration check uses timestamp + delay, so if delay becomes 0, message expires immediately:
|
delay = env.segment_position.delay |
|
timestamp = env.message.timestamp |
|
expire_at = timestamp + delay |
|
expire_at <= RoughTime.unix_ms |
I suspect either:
- type mismatch for AMQP header numeric types (e.g. Int64/LongLong not matching
as?(Int)), or
- overflow-to-zero behavior when converting to
UInt32.
In both cases, silently coercing invalid/overflow delay to zero can cause immediate delivery.
Suggestion
- Handle wider numeric types for
x-delay explicitly (at least Int64).
- Validate bounds and reject unsupported values with a precondition error, instead of silently defaulting to
0.
- Consider storing delay as a wider type (
Int64/UInt64) if large delays are intended to be supported.
Summary
Messages published to an
x-delayed-messageexchange with a largex-delaycan be delivered almost immediately instead of being delayed.I can reproduce this on LavinMQ
2.6.8with a Spring AMQP producer.Environment
Reproduction
x-delayed-messagewithx-delayed-type=direct.x-delay=31536000000(365 days in ms).Reproduction Test Code (JUnit)
Actual Result
Message is consumed almost immediately. Example output:
Expected Result
The message should remain delayed (certainly not delivered within 60 seconds when delay is 365 days).
Source Analysis (possible root cause)
I looked at current
main(commit6cad6c2b499e3269489152b494a2cdbc83317645) and found:Delay is stored as
UInt32inSegmentPosition:lavinmq/src/lavinmq/segment_position.cr
Line 8 in 6cad6c2
x-delayparsing inSegmentPosition.makeconverts withas?(Int).try(&.to_u32)and rescues to0u32:lavinmq/src/lavinmq/segment_position.cr
Line 24 in 6cad6c2
Expiration check uses
timestamp + delay, so if delay becomes0, message expires immediately:lavinmq/src/lavinmq/amqp/queue/delayed_exchange_queue.cr
Lines 106 to 109 in 6cad6c2
I suspect either:
as?(Int)), orUInt32.In both cases, silently coercing invalid/overflow delay to zero can cause immediate delivery.
Suggestion
x-delayexplicitly (at least Int64).0.Int64/UInt64) if large delays are intended to be supported.