Skip to content

Commit f80753f

Browse files
authored
[ISSUE #10050] Support ChangeInvisibleTime without incrementing message reconsume times (#10051)
1 parent 7583fda commit f80753f

17 files changed

Lines changed: 975 additions & 32 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,20 @@ public int getCode() {
7070
@JSONField(ordinal = 8)
7171
private String attemptId;
7272

73+
@JSONField(ordinal = 9)
74+
private boolean suspend;
75+
7376
// used for test and fastjson
7477
public PopConsumerRecord() {
7578
}
7679

7780
public PopConsumerRecord(long popTime, String groupId, String topicId, int queueId,
7881
int retryFlag, long invisibleTime, long offset, String attemptId) {
82+
this(popTime, groupId, topicId, queueId, retryFlag, invisibleTime, offset, attemptId, false);
83+
}
84+
85+
public PopConsumerRecord(long popTime, String groupId, String topicId, int queueId, int retryFlag,
86+
long invisibleTime, long offset, String attemptId, boolean suspend) {
7987

8088
this.popTime = popTime;
8189
this.groupId = groupId;
@@ -85,6 +93,7 @@ public PopConsumerRecord(long popTime, String groupId, String topicId, int queue
8593
this.invisibleTime = invisibleTime;
8694
this.offset = offset;
8795
this.attemptId = attemptId;
96+
this.suspend = suspend;
8897
}
8998

9099
@JSONField(serialize = false)
@@ -194,6 +203,14 @@ public void setAttemptId(String attemptId) {
194203
this.attemptId = attemptId;
195204
}
196205

206+
public boolean isSuspend() {
207+
return suspend;
208+
}
209+
210+
public void setSuspend(boolean suspend) {
211+
this.suspend = suspend;
212+
}
213+
197214
@Override
198215
public String toString() {
199216
return "PopDeliveryRecord{" +
@@ -206,6 +223,7 @@ public String toString() {
206223
", offset=" + offset +
207224
", attemptTimes=" + attemptTimes +
208225
", attemptId='" + attemptId + '\'' +
226+
", suspend=" + suspend +
209227
'}';
210228
}
211229
}

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,9 @@ public CompletableFuture<Boolean> ackAsync(
486486
}
487487

488488
// refer ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin
489-
public void changeInvisibilityDuration(long popTime, long invisibleTime,
490-
long changedPopTime, long changedInvisibleTime, String groupId, String topicId, int queueId, long offset) {
489+
public void changeInvisibilityDuration(long popTime, long invisibleTime, long changedPopTime,
490+
long changedInvisibleTime, String groupId, String topicId,
491+
int queueId, long offset, boolean suspend) {
491492

492493
if (brokerConfig.isPopConsumerKVServiceLog()) {
493494
log.info("PopConsumerService change, time={}, invisible={}, " +
@@ -496,10 +497,10 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime,
496497
}
497498

498499
PopConsumerRecord ckRecord = new PopConsumerRecord(
499-
changedPopTime, groupId, topicId, queueId, 0, changedInvisibleTime, offset, null);
500+
changedPopTime, groupId, topicId, queueId, 0, changedInvisibleTime, offset, null, suspend);
500501

501502
PopConsumerRecord ackRecord = new PopConsumerRecord(
502-
popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null);
503+
popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null, suspend);
503504

504505
// No need to generate new records when the group does not exist,
505506
// because these retry messages will not be consumed by anyone.
@@ -689,7 +690,12 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
689690
msgInner.setSysFlag(messageExt.getSysFlag());
690691
msgInner.setBornHost(brokerController.getStoreHost());
691692
msgInner.setStoreHost(brokerController.getStoreHost());
692-
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
693+
if (record.isSuspend()) {
694+
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
695+
} else {
696+
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
697+
}
698+
693699
msgInner.getProperties().putAll(messageExt.getProperties());
694700

695701
// set first pop time here

broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public CompletableFuture<RemotingCommand> processRequestAsync(final Channel chan
153153
brokerController.getPopConsumerService().changeInvisibilityDuration(
154154
ExtraInfoUtil.getPopTime(extraInfo), ExtraInfoUtil.getInvisibleTime(extraInfo), current,
155155
requestHeader.getInvisibleTime(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
156-
requestHeader.getQueueId(), requestHeader.getOffset());
156+
requestHeader.getQueueId(), requestHeader.getOffset(), requestHeader.isSuspend());
157157
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
158158
responseHeader.setPopTime(current);
159159
responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
@@ -324,6 +324,7 @@ private CompletableFuture<Boolean> appendCheckPointThenAckOrigin(
324324
ck.setQueueId(queueId);
325325
ck.addDiff(0);
326326
ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
327+
ck.setSuspend(requestHeader.isSuspend());
327328

328329
msgInner.setBody(JSON.toJSONString(ck).getBytes(StandardCharsets.UTF_8));
329330
msgInner.setQueueId(reviveQid);

broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,11 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
122122
msgInner.setSysFlag(messageExt.getSysFlag());
123123
msgInner.setBornHost(brokerController.getStoreHost());
124124
msgInner.setStoreHost(brokerController.getStoreHost());
125-
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
125+
if (popCheckPoint.isSuspend()) {
126+
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
127+
} else {
128+
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
129+
}
126130
msgInner.getProperties().putAll(messageExt.getProperties());
127131
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
128132
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,189 @@ public void deliveryRecordSerializeTest() {
7272
Assert.assertEquals(0, consumerRecord2.getAttemptTimes());
7373
Assert.assertEquals(decodeRecord.getAttemptId(), consumerRecord2.getAttemptId());
7474
}
75+
76+
@Test
77+
public void testSuspendFlagInitialization() {
78+
// Test constructor without suspend flag (should default to false)
79+
PopConsumerRecord record1 = new PopConsumerRecord(
80+
System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id");
81+
Assert.assertFalse("Suspend flag should default to false", record1.isSuspend());
82+
83+
// Test constructor with suspend flag set to true
84+
PopConsumerRecord record2 = new PopConsumerRecord(
85+
System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", true);
86+
Assert.assertTrue("Suspend flag should be true", record2.isSuspend());
87+
88+
// Test constructor with suspend flag set to false
89+
PopConsumerRecord record3 = new PopConsumerRecord(
90+
System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", false);
91+
Assert.assertFalse("Suspend flag should be false", record3.isSuspend());
92+
}
93+
94+
@Test
95+
public void testSuspendFlagSerialization() {
96+
// Test serialization/deserialization with suspend flag
97+
PopConsumerRecord originalRecord = new PopConsumerRecord(
98+
1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", true);
99+
100+
byte[] serialized = originalRecord.getValueBytes();
101+
PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
102+
103+
Assert.assertTrue("Deserialized record should have suspend flag true", deserialized.isSuspend());
104+
Assert.assertEquals("Other fields should match", originalRecord.getGroupId(), deserialized.getGroupId());
105+
Assert.assertEquals("Other fields should match", originalRecord.getTopicId(), deserialized.getTopicId());
106+
Assert.assertEquals("Other fields should match", originalRecord.getOffset(), deserialized.getOffset());
107+
}
108+
109+
@Test
110+
public void testSuspendFlagGetterSetter() {
111+
PopConsumerRecord record = new PopConsumerRecord();
112+
113+
// Test initial value
114+
Assert.assertFalse("Initial suspend value should be false", record.isSuspend());
115+
116+
// Test setter
117+
record.setSuspend(true);
118+
Assert.assertTrue("After setting to true, should be true", record.isSuspend());
119+
120+
record.setSuspend(false);
121+
Assert.assertFalse("After setting to false, should be false", record.isSuspend());
122+
}
123+
124+
@Test
125+
public void testSuspendInToString() {
126+
PopConsumerRecord record = new PopConsumerRecord(
127+
1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", true);
128+
129+
String toString = record.toString();
130+
Assert.assertTrue("toString should include suspend information", toString.contains("suspend=true"));
131+
132+
PopConsumerRecord record2 = new PopConsumerRecord(
133+
1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", false);
134+
135+
String toString2 = record2.toString();
136+
Assert.assertTrue("toString should include suspend information", toString2.contains("suspend=false"));
137+
}
138+
139+
@Test
140+
public void testSuspendFlagSerializationWithFalse() {
141+
// Test serialization/deserialization with suspend flag set to false
142+
PopConsumerRecord originalRecord = new PopConsumerRecord(
143+
1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, "attempt-id", false);
144+
145+
byte[] serialized = originalRecord.getValueBytes();
146+
PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
147+
148+
Assert.assertFalse("Deserialized record should have suspend flag false", deserialized.isSuspend());
149+
Assert.assertEquals("GroupId should match", originalRecord.getGroupId(), deserialized.getGroupId());
150+
Assert.assertEquals("TopicId should match", originalRecord.getTopicId(), deserialized.getTopicId());
151+
Assert.assertEquals("Offset should match", originalRecord.getOffset(), deserialized.getOffset());
152+
Assert.assertEquals("PopTime should match", originalRecord.getPopTime(), deserialized.getPopTime());
153+
Assert.assertEquals("QueueId should match", originalRecord.getQueueId(), deserialized.getQueueId());
154+
Assert.assertEquals("InvisibleTime should match", originalRecord.getInvisibleTime(), deserialized.getInvisibleTime());
155+
Assert.assertEquals("RetryFlag should match", originalRecord.getRetryFlag(), deserialized.getRetryFlag());
156+
Assert.assertEquals("AttemptId should match", originalRecord.getAttemptId(), deserialized.getAttemptId());
157+
}
158+
159+
@Test
160+
public void testSuspendFlagJSONSerializationCompleteness() {
161+
// Test complete serialization/deserialization with all fields including suspend
162+
long popTime = System.currentTimeMillis();
163+
String groupId = "test-group";
164+
String topicId = "test-topic";
165+
int queueId = 1;
166+
int retryFlag = PopConsumerRecord.RetryType.RETRY_TOPIC_V2.getCode();
167+
long invisibleTime = 30000L;
168+
long offset = 100L;
169+
String attemptId = UUID.randomUUID().toString().toUpperCase();
170+
171+
// Test with suspend = true
172+
PopConsumerRecord recordWithSuspend = new PopConsumerRecord(
173+
popTime, groupId, topicId, queueId, retryFlag, invisibleTime, offset, attemptId, true);
174+
recordWithSuspend.setAttemptTimes(3);
175+
176+
byte[] serialized = recordWithSuspend.getValueBytes();
177+
PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
178+
179+
Assert.assertTrue("Suspend flag should be true", deserialized.isSuspend());
180+
Assert.assertEquals("PopTime should match", popTime, deserialized.getPopTime());
181+
Assert.assertEquals("GroupId should match", groupId, deserialized.getGroupId());
182+
Assert.assertEquals("TopicId should match", topicId, deserialized.getTopicId());
183+
Assert.assertEquals("QueueId should match", queueId, deserialized.getQueueId());
184+
Assert.assertEquals("RetryFlag should match", retryFlag, deserialized.getRetryFlag());
185+
Assert.assertEquals("InvisibleTime should match", invisibleTime, deserialized.getInvisibleTime());
186+
Assert.assertEquals("Offset should match", offset, deserialized.getOffset());
187+
Assert.assertEquals("AttemptTimes should match", 3, deserialized.getAttemptTimes());
188+
Assert.assertEquals("AttemptId should match", attemptId, deserialized.getAttemptId());
189+
190+
// Test with suspend = false
191+
PopConsumerRecord recordWithoutSuspend = new PopConsumerRecord(
192+
popTime, groupId, topicId, queueId, retryFlag, invisibleTime, offset, attemptId, false);
193+
recordWithoutSuspend.setAttemptTimes(3);
194+
195+
serialized = recordWithoutSuspend.getValueBytes();
196+
deserialized = PopConsumerRecord.decode(serialized);
197+
198+
Assert.assertFalse("Suspend flag should be false", deserialized.isSuspend());
199+
Assert.assertEquals("PopTime should match", popTime, deserialized.getPopTime());
200+
Assert.assertEquals("GroupId should match", groupId, deserialized.getGroupId());
201+
Assert.assertEquals("TopicId should match", topicId, deserialized.getTopicId());
202+
Assert.assertEquals("QueueId should match", queueId, deserialized.getQueueId());
203+
Assert.assertEquals("RetryFlag should match", retryFlag, deserialized.getRetryFlag());
204+
Assert.assertEquals("InvisibleTime should match", invisibleTime, deserialized.getInvisibleTime());
205+
Assert.assertEquals("Offset should match", offset, deserialized.getOffset());
206+
Assert.assertEquals("AttemptTimes should match", 3, deserialized.getAttemptTimes());
207+
Assert.assertEquals("AttemptId should match", attemptId, deserialized.getAttemptId());
208+
}
209+
210+
@Test
211+
public void testSuspendFlagDefaultValueInNoArgConstructor() {
212+
// Test that no-arg constructor defaults suspend to false
213+
PopConsumerRecord record = new PopConsumerRecord();
214+
Assert.assertFalse("No-arg constructor should default suspend to false", record.isSuspend());
215+
216+
// Set all fields manually
217+
record.setPopTime(System.currentTimeMillis());
218+
record.setGroupId("test-group");
219+
record.setTopicId("test-topic");
220+
record.setQueueId(0);
221+
record.setRetryFlag(0);
222+
record.setInvisibleTime(30000L);
223+
record.setOffset(100L);
224+
record.setAttemptId("attempt-id");
225+
record.setSuspend(true);
226+
227+
Assert.assertTrue("After setting suspend to true, should be true", record.isSuspend());
228+
229+
// Serialize and deserialize to verify
230+
byte[] serialized = record.getValueBytes();
231+
PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
232+
Assert.assertTrue("Deserialized record should preserve suspend=true", deserialized.isSuspend());
233+
}
234+
235+
@Test
236+
public void testSuspendFlagInDeliveryRecordSerializeTest() {
237+
// Enhance existing deliveryRecordSerializeTest to include suspend flag
238+
PopConsumerRecord consumerRecord = new PopConsumerRecord();
239+
consumerRecord.setPopTime(System.currentTimeMillis());
240+
consumerRecord.setGroupId("GroupId");
241+
consumerRecord.setTopicId("TopicId");
242+
consumerRecord.setQueueId(3);
243+
consumerRecord.setRetryFlag(PopConsumerRecord.RetryType.RETRY_TOPIC_V1.getCode());
244+
consumerRecord.setInvisibleTime(20);
245+
consumerRecord.setOffset(100);
246+
consumerRecord.setAttemptTimes(2);
247+
consumerRecord.setAttemptId(UUID.randomUUID().toString().toUpperCase());
248+
consumerRecord.setSuspend(true);
249+
250+
PopConsumerRecord decodeRecord = PopConsumerRecord.decode(consumerRecord.getValueBytes());
251+
Assert.assertTrue("Decoded record should preserve suspend flag", decodeRecord.isSuspend());
252+
Assert.assertEquals("Suspend flag should match", consumerRecord.isSuspend(), decodeRecord.isSuspend());
253+
254+
// Test with suspend = false
255+
consumerRecord.setSuspend(false);
256+
decodeRecord = PopConsumerRecord.decode(consumerRecord.getValueBytes());
257+
Assert.assertFalse("Decoded record should preserve suspend=false", decodeRecord.isSuspend());
258+
Assert.assertEquals("Suspend flag should match", consumerRecord.isSuspend(), decodeRecord.isSuspend());
259+
}
75260
}

0 commit comments

Comments
 (0)