[ISSUE #10525] Reduce pull/dispatch path allocation via primitive arrays, ThreadLocal reuse, and lambda elimination#10526
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR focuses on reducing allocations and improving I/O paths in the store layer by reusing buffers, adding a ByteBuffer-based FileChannel append API, and optimizing queue offset storage.
Changes:
- Add
appendMessageUsingFileChannel(ByteBuffer)toMappedFileand implement it inDefaultMappedFile - Optimize buffer usage (cached slice reuse; safer
SelectMappedBufferResultslicing viaduplicate()+ explicit limits) - Replace
GetMessageResultqueue offsets fromList<Long>to a primitivelong[]with a lightweightListview and addaddQueueOffset(...)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java | Adds a ByteBuffer overload for FileChannel-based appends |
| store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java | Implements ByteBuffer FileChannel append; reuses append slices; adjusts buffer selection logic |
| store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java | Switches queue offsets to primitive array + view; adds addQueueOffset |
| store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java | Makes fields resettable; adds reset(...) and a package-private ctor |
| store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | Simplifies async put methods; pre-sizes GetMessageResult with maxMsgNums |
| store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | Reuses a precomputed topic-queue key; uses ByteBuffer FileChannel append |
| broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java | Updates tests to use addQueueOffset |
| broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java | Updates tests to use addQueueOffset |
| broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java | Updates tests to use addQueueOffset |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR reduces allocation in the pull/dispatch hot path through six optimizations: primitive long[] replacing List<Long> in GetMessageResult, DispatchRequest object reuse via reset(), cached topicQueueKey in ConsumeQueue, ByteBuffer-based FileChannel write in DefaultMappedFile, and removal of async callback in DefaultMessageStore. The direction is correct — eliminating autoboxing and buffer copies in high-throughput paths.
Findings
-
[Critical]
DefaultMessageStore.java:620-630— Removal of put-message stats tracking. ThethenAcceptcallback that trackedsetPutMessageEntireTimeMax(),getPutMessageFailedTimes(), and the slow-put warning log has been removed entirely. This eliminates observability for:- Put message latency percentiles (used by monitoring/alerting)
- Failed put message counts (used for error rate tracking)
- Slow put warnings (>500ms threshold)
The PR title says "Reduce pull/dispatch path allocation" but this removes stats from the put path. This appears to be scope creep or an unintended side effect. If the goal is to reduce allocation, consider keeping the stats tracking but using a more efficient mechanism (e.g., direct field assignment instead of lambda capture).
-
[Warning]
DispatchRequest.java:45-70—reset()method always setssuccess = true. The error constructorDispatchRequest(..., long tagsCode)setssuccess = false, butreset()unconditionally sets it totrue. If the object pool reuses a failedDispatchRequest, the error state is silently lost. Consider either:- Adding a
successparameter toreset() - Or ensuring failed requests are never reused (document this contract)
- Adding a
-
[Info]
GetMessageResult.java:45-55— Thelong[]array withAbstractList<Long>wrapper for backward compatibility is a good pattern. The growth formulasize + (size >> 1)avoids the+1inaddQueueOffset, but since the default constructor initializes withnew long[100], this is safe in practice. -
[Info]
DefaultMappedFile.java:200-210— ThecachedAppendSlicelazy initialization is not volatile/synchronized, but since the slice is deterministic (same underlying buffer), the worst case is one redundant allocation, which is benign.
Suggestions
- Restore or relocate the put-message stats tracking in
DefaultMessageStore— this is a regression in observability. - Clarify the
DispatchRequestreuse contract regarding thesuccessfield. - Consider adding a JMH benchmark for the
ConsumeQueue.appendMessagepath to quantify the improvement.
Automated review by github-manager-bot
4644074 to
bcf8c40
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10526 +/- ##
=============================================
- Coverage 48.19% 48.09% -0.11%
+ Complexity 13398 13380 -18
=============================================
Files 1377 1377
Lines 100730 100885 +155
Branches 13012 13039 +27
=============================================
- Hits 48548 48517 -31
- Misses 46252 46408 +156
- Partials 5930 5960 +30 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commits)
Summary
Force-pushed update to the pull/dispatch path allocation reduction PR. The core optimizations remain: long[] replacing List<Long> in GetMessageResult, DispatchRequest ThreadLocal reuse, cached topicQueueKey, DefaultMappedFile buffer optimizations, and async callback removal in DefaultMessageStore. New: ByteBuffer-based appendMessageUsingFileChannel in ConsumeQueue and duplicate() instead of slice() in selectMappedBuffer.
Findings
-
[Critical]
DefaultMessageStore.java— Put-message stats tracking still removed. ThethenAcceptcallback containingsetPutMessageEntireTimeMax(),getPutMessageFailedTimes(), and the slow-put warning log (>500ms) has been removed. This eliminates observability for put latency, failure counts, and slow-put alerts. This was flagged in the previous review and remains unaddressed. This is a regression in monitoring capability.Suggestion: Keep the stats tracking but avoid the lambda allocation. Options:
- Inline the stats tracking after the synchronous
putMessagecall (no lambda needed) - Use a pre-allocated callback object stored in a field
- Move stats tracking to the caller of
putMessage/putMessages
- Inline the stats tracking after the synchronous
-
[Warning]
ConsumeQueue.java—byteBufferIndex.flip()on shared buffer. Theflip()call beforeappendMessageUsingFileChannel(this.byteBufferIndex)is correct for the write path (setslimit=position, position=0). However,byteBufferIndexis a shared field. If any code path reads from this buffer after theflip()without first resetting it, it will see stale data. Since the dispatch path is single-threaded per ConsumeQueue, this is safe in practice, but consider adding a comment documenting the buffer lifecycle contract. -
[Info]
DefaultMappedFile.java:selectMappedBuffer— Usingduplicate()+limit()instead ofslice()+slice()reduces one ByteBuffer allocation. Theduplicate()shares the underlying data but has independent position/limit, which is correct here. -
[Info]
DefaultMappedFile.java:appendMessageUsingFileChannel(ByteBuffer)— New overload acceptingByteBufferdirectly. Thedata.remaining()call correctly computes the write length. Theflip()in the caller ensures the buffer is in read mode. This avoids the.array()call which would throwUnsupportedOperationExceptionfor direct buffers.
Unresolved from Previous Review
DispatchRequest.reset()always setssuccess = true— if a failed request is reused from the ThreadLocal pool, the error state is silently lost. Consider documenting that failed requests must not be reused, or adding asuccessparameter toreset().
Automated review by github-manager-bot
…ve arrays, ThreadLocal reuse, and lambda elimination - GetMessageResult: List<Long> -> long[] + addQueueOffset(long), right-sized capacity - DispatchRequest: ThreadLocal reuse via mutable fields + reset() - DefaultMappedFile: cached append slice + dual-slice merge in selectMappedBuffer - DefaultMessageStore: remove thenAccept callback lambda in putMessage/putMessages - ConsumeQueue: topicQueueKey final field - Update 3 test files for GetMessageResult.addQueueOffset API
bcf8c40 to
a313999
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commits)
Summary
Re-review after new commit a3139997 (2026-06-19). This update addresses the critical put-message stats tracking regression from the previous review by switching thenAccept → whenComplete with proper null-check. The DispatchRequest reset/pooling pattern and GetMessageResult long[] optimization are carried forward. Test files are updated to use the new addQueueOffset() API.
Resolved Issues ✅
- [Critical] RESOLVED
DefaultMessageStore.java— Put-message stats tracking is now restored viawhenComplete. The callback correctly handles both success and failure:null == resultcovers the exceptional completion case (counts as failure)- Slow-put warning (>500ms) is preserved
setPutMessageEntireTimeMaxandgetPutMessageFailedTimesare both called- Note: The exception
exparameter is not logged. Consider addingif (ex != null) LOGGER.warn("...", ex)for debuggability, though the failure counter correctly captures it.
Remaining Findings
-
[Warning]
DispatchRequest.java—reset()always setssuccess = true. If a failed dispatch request is returned to the ThreadLocal pool and later reused, the error state is silently lost. The caller would seesuccess = trueeven though the original dispatch failed.- Suggestion: Either document that failed requests must not be pooled, or add a
reset(boolean success)parameter so callers can preserve the original state.
- Suggestion: Either document that failed requests must not be pooled, or add a
-
[Warning]
DefaultMappedFile.java—cachedAppendSliceis a shared instance field. The cachedByteBuffer.slice()is stored asthis.cachedAppendSlice. IfappendMessageandappendMessagesInnerare ever called concurrently on the sameMappedFileinstance, the shared slice could cause data corruption. This is likely safe in practice (putMessage is serialized by lock), but consider adding a comment documenting the thread-safety assumption. -
[Info]
GetMessageResult.java—long[]replacingList<Long>is a good allocation reduction. TheAbstractList<Long>view returned bygetMessageQueueOffset()boxes on eachget()call, but this is acceptable for read paths. -
[Info]
ConsumeQueue.java— CachingtopicQueueKeyas an instance field avoids repeated string concatenation on the dispatch hot path. Good optimization. -
[Info]
DefaultMappedFile.java—duplicate()+limit()instead of doubleslice()reduces one ByteBuffer allocation. TheappendMessageUsingFileChannel(ByteBuffer)overload avoids the.array()call which would fail for direct buffers.
Verdict
The critical regression is fixed. The remaining warnings are non-blocking. This PR looks good to merge after addressing the DispatchRequest.reset() concern.
Automated re-review by github-manager-bot
Which Issue(s) This PR Fixes
Fixes #10525
Brief Description
Reduce allocation in the pull/dispatch path through five independent optimizations:
GetMessageResult— ReplaceList<Long>withlong[]+addQueueOffset(long)method. EliminatesLongboxing andArrayListresize overhead. Right-sized initial capacity via constructor parameter (capped at 256 to prevent OOM).DispatchRequest— Changefinalfields to mutable + addreset()method for ThreadLocal reuse. Eliminates per-messageDispatchRequestallocation in the dispatch hot path.DefaultMappedFile— Cache append slice and merge dual-slice operation inselectMappedBuffer. Eliminates redundantByteBuffer.slice()allocation.DefaultMessageStore— ReplacethenAcceptwithwhenCompletefor stats recording. Preserves put-latency metrics, slow-log, and failure counting while eliminating lambda capture overhead.ConsumeQueue— MaketopicQueueKeyafinalfield to avoid per-call string computation.How Did You Test This Change?
EscapeBridgeTest,PopConsumerServiceTest,PopLiteMessageProcessorTest,BatchConsumeMessageTest).store+brokermodules compile cleanly on JDK 21.