Skip to content

Commit 10c650c

Browse files
authored
[ISSUE #10034] Optimizing cq iterator and calculating lag (#10056)
1 parent 9388842 commit 10c650c

3 files changed

Lines changed: 122 additions & 31 deletions

File tree

common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,13 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
337337
final byte[] start, final byte[] end, BiConsumer<byte[], byte[]> callback) throws RocksDBException {
338338

339339
if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
340-
throw new RocksDBException("To determine lower boundary, prefix and start may not be null at the same "
341-
+ "time.");
340+
throw new RocksDBException(
341+
"To determine lower boundary, prefix and start may not be null at the same time.");
342342
}
343343

344344
if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(end)) {
345-
throw new RocksDBException("To determine upper boundary, prefix and end may not be null at the same time.");
345+
throw new RocksDBException(
346+
"To determine upper boundary, prefix and end may not be null at the same time.");
346347
}
347348

348349
if (columnFamilyHandle == null) {

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public class MessageStoreConfig {
132132
@ImportantField
133133
private String storeType = StoreType.DEFAULT.getStoreType();
134134

135+
private boolean iteratorWhenUseRocksdbConsumeQueue = true;
136+
135137
// ConsumeQueue file size,default is 30W
136138
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
137139
// enable consume queue ext
@@ -667,6 +669,14 @@ public void setStoreType(String storeType) {
667669
this.storeType = storeType;
668670
}
669671

672+
public boolean isIteratorWhenUseRocksdbConsumeQueue() {
673+
return iteratorWhenUseRocksdbConsumeQueue;
674+
}
675+
676+
public void setIteratorWhenUseRocksdbConsumeQueue(boolean iteratorWhenUseRocksdbConsumeQueue) {
677+
this.iteratorWhenUseRocksdbConsumeQueue = iteratorWhenUseRocksdbConsumeQueue;
678+
}
679+
670680
public int getMappedFileSizeConsumeQueue() {
671681
int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
672682
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);

store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java

Lines changed: 108 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.rocketmq.store.queue;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import org.apache.rocketmq.common.BoundaryType;
2223
import org.apache.rocketmq.common.Pair;
@@ -226,46 +227,54 @@ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, Message
226227
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
227228
}
228229

230+
/**
231+
* It is CPU-intensive with many offline group
232+
* Optimize by caching their estimated info
233+
*/
229234
@Override
230235
public long estimateMessageCount(long from, long to, MessageFilter filter) {
231-
// Check from and to offset validity
232-
Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
233-
if (fromUnit == null) {
234-
return -1;
235-
}
236236

237-
if (from >= to) {
238-
return -1;
237+
Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
238+
if (fromUnit == null || from >= to) {
239+
return -1L;
239240
}
240241

241242
if (to > getMaxOffsetInQueue()) {
242243
to = getMaxOffsetInQueue();
243244
}
244245

245-
int maxSampleSize = messageStoreConfig.getMaxConsumeQueueScan();
246-
int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from);
246+
int sampleCount = 0;
247+
int sampleTotal = Math.min((int) (to - from), messageStoreConfig.getMaxConsumeQueueScan());
247248

248-
int matchThreshold = messageStoreConfig.getSampleCountThreshold();
249-
int matchSize = 0;
249+
int matchCount = 0;
250+
int matchTotal = messageStoreConfig.getSampleCountThreshold();
250251

251-
for (int i = 0; i < sampleSize; i++) {
252-
long index = from + i;
253-
Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
254-
if (pair == null) {
255-
continue;
256-
}
257-
CqUnit cqUnit = pair.getObject1();
258-
if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
259-
matchSize++;
260-
// if matchSize is plenty, early exit estimate
261-
if (matchSize > matchThreshold) {
262-
sampleSize = i;
263-
break;
252+
try {
253+
ReferredIterator<CqUnit> iterator = this.iterateFrom(from, matchTotal);
254+
while (iterator != null && iterator.hasNext() && sampleCount++ < sampleTotal) {
255+
CqUnit cqUnit = iterator.next();
256+
if (filter.isMatchedByConsumeQueue(
257+
cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
258+
if (++matchCount > matchTotal) {
259+
sampleTotal = sampleCount;
260+
break;
261+
}
264262
}
265263
}
264+
} catch (Throwable t) {
265+
log.error("EstimateLag error, from={}, to={}", from, to, t);
266+
}
267+
268+
long result = sampleTotal == 0 ? 0 :
269+
(long) ((to - from) * (matchCount / (sampleTotal * 1.0)));
270+
271+
if (log.isTraceEnabled()) {
272+
log.trace("EstimateLag, topic={}, queueId={}, offset={}-{}, total={}, hit rate={}/{}({}%), result={}",
273+
topic, queueId, from, to, to - from,
274+
matchCount, sampleCount, String.format("%.1f", (double) matchCount * 100.0 / sampleCount), result);
266275
}
267-
// Make sure the second half is a floating point number, otherwise it will be truncated to 0
268-
return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0)));
276+
277+
return result;
269278
}
270279

271280

@@ -302,7 +311,7 @@ public ReferredIterator<CqUnit> iterateFrom(long startIndex, int count) throws R
302311
long maxCqOffset = getMaxOffsetInQueue();
303312
if (startIndex < maxCqOffset) {
304313
int num = Math.min((int)(maxCqOffset - startIndex), count);
305-
return iterateFrom0(startIndex, num);
314+
return iterateFrom0(startIndex, num, maxCqOffset);
306315
}
307316
return null;
308317
}
@@ -365,7 +374,13 @@ public long getLastOffset() {
365374
return getMaxPhysicOffset();
366375
}
367376

368-
private ReferredIterator<CqUnit> iterateFrom0(final long startIndex, final int count) throws RocksDBException {
377+
private ReferredIterator<CqUnit> iterateFrom0(
378+
final long startIndex, final int count, final long maxOffset) throws RocksDBException {
379+
380+
if (messageStoreConfig.isIteratorWhenUseRocksdbConsumeQueue()) {
381+
return new RocksDBReusableIterator(topic, queueId, startIndex, count, maxOffset);
382+
}
383+
369384
List<ByteBuffer> byteBufferList = this.consumeQueueStore.rangeQuery(topic, queueId, startIndex, count);
370385
if (byteBufferList == null || byteBufferList.isEmpty()) {
371386
if (this.messageStoreConfig.isEnableRocksDBLog()) {
@@ -386,6 +401,71 @@ public int getQueueId() {
386401
return queueId;
387402
}
388403

404+
private class RocksDBReusableIterator implements ReferredIterator<CqUnit> {
405+
406+
private final String topic;
407+
private final int queueId;
408+
private long offset;
409+
private final int count;
410+
private final long maxOffset;
411+
412+
private int bufferIndex;
413+
private List<ByteBuffer> buffers;
414+
415+
// offset + count <= max offset
416+
public RocksDBReusableIterator(String topic, int queueId, long offset, int count, long maxOffset) {
417+
this.topic = topic;
418+
this.queueId = queueId;
419+
this.offset = offset;
420+
this.count = count;
421+
this.maxOffset = maxOffset;
422+
423+
this.bufferIndex = 0;
424+
this.buffers = new ArrayList<>(count);
425+
}
426+
427+
@Override
428+
public void release() {
429+
}
430+
431+
@Override
432+
public CqUnit nextAndRelease() {
433+
try {
434+
return next();
435+
} finally {
436+
release();
437+
}
438+
}
439+
440+
@Override
441+
public boolean hasNext() {
442+
return offset < maxOffset;
443+
}
444+
445+
@Override
446+
public CqUnit next() {
447+
try {
448+
if (buffers.isEmpty() || bufferIndex >= buffers.size()) {
449+
int batchSize = (int) Math.min(count, maxOffset - offset);
450+
if (batchSize == 0) {
451+
return null;
452+
} else {
453+
bufferIndex = 0;
454+
buffers = consumeQueueStore.rangeQuery(topic, queueId, offset, batchSize);
455+
}
456+
}
457+
if (bufferIndex < buffers.size()) {
458+
ByteBuffer buffer = buffers.get(bufferIndex++);
459+
return new CqUnit(offset++, buffer.getLong(), buffer.getInt(), buffer.getLong());
460+
}
461+
} catch (Throwable t) {
462+
log.error("RocksDB reusable iterator search error, " +
463+
"topic={}, queueId={}, offset={}, count={}", topic, queueId, offset, count, maxOffset, t);
464+
}
465+
return null;
466+
}
467+
}
468+
389469
private class RocksDBConsumeQueueIterator implements ReferredIterator<CqUnit> {
390470
private final List<ByteBuffer> byteBufferList;
391471
private final long startIndex;

0 commit comments

Comments
 (0)