Skip to content

Commit 2eb537c

Browse files
authored
[ISSUE #9980] Skip invalid Pop records when consumer group does not exist (#9982)
Signed-off-by: Aman Gautam <[email protected]>
1 parent ee10411 commit 2eb537c

1 file changed

Lines changed: 20 additions & 0 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,13 @@ private void scanGarbage() {
216216
}
217217
}
218218

219+
private boolean isSubscriptionGroupNotExist(PopCheckPointWrapper pointWrapper) {
220+
String group = pointWrapper.getCk().getCId();
221+
return brokerController.getSubscriptionGroupManager()
222+
.findSubscriptionGroupConfig(group) == null;
223+
}
224+
225+
219226
private void scan() {
220227
long startTime = System.currentTimeMillis();
221228
AtomicInteger count = new AtomicInteger(0);
@@ -225,6 +232,19 @@ private void scan() {
225232
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
226233
PopCheckPointWrapper pointWrapper = entry.getValue();
227234

235+
// Skip invalid POP records when consumer group does not exist
236+
if (isSubscriptionGroupNotExist(pointWrapper)) {
237+
POP_LOGGER.warn(
238+
"[PopBuffer] skip pop record because consumer group not exist, group={}, ck={}",
239+
pointWrapper.getCk().getCId(),
240+
pointWrapper
241+
);
242+
iterator.remove();
243+
counter.decrementAndGet();
244+
continue;
245+
}
246+
247+
228248
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
229249
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
230250
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {

0 commit comments

Comments
 (0)