[CELEBORN-2340] Explicitly trim Flusher thread pool cache when flusher queue is empty#3696
Open
s0nskar wants to merge 2 commits into
Open
[CELEBORN-2340] Explicitly trim Flusher thread pool cache when flusher queue is empty#3696s0nskar wants to merge 2 commits into
s0nskar wants to merge 2 commits into
Conversation
Contributor
Author
|
cc: @leixm @SteNicholas @FMX PTAL |
zaynt4606
reviewed
May 21, 2026
Contributor
zaynt4606
left a comment
There was a problem hiding this comment.
LGTM but we need a JIRA ticket to merge
There was a problem hiding this comment.
Pull request overview
This PR updates the worker-side storage flusher loop to proactively release Netty PooledByteBufAllocator per-thread cache memory when flusher threads are idle, aiming to allow usedDirectMemory to drop after heavy shuffle workloads complete.
Changes:
- Replace
workingQueues(index).take()withpoll(..., 1000ms)in flusher worker threads. - On poll timeout (queue idle), call
PooledByteBufAllocator.trimCurrentThreadCache()to flush thread-local cached buffers back to the arenas.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1d92a40 to
cf8d472
Compare
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Contributor
Author
|
Added the jira and handled copilot suggestion. |
2 tasks
Comment on lines
+93
to
97
| processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE) | ||
| logWarning(s"Flusher-$this-thread-$index encounter exception.", t) | ||
| } | ||
| } catch { | ||
| case t: Throwable => | ||
| val e = ExceptionUtils.wrapThrowableToIOException(t) | ||
| task.notifier.setException(e) | ||
| processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE) | ||
| logWarning(s"Flusher-$this-thread-$index encounter exception.", t) | ||
| lastBeginFlushTime.set(index, -1) | ||
| } |
Comment on lines
+101
to
+107
| } else { | ||
| allocator match { | ||
| case alloc: PooledByteBufAllocator => | ||
| // Free buffer pool memory to main direct memory when flush thread is idle. | ||
| alloc.trimCurrentThreadCache | ||
| case _ => | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
We are using 0.5.3 and noticed that after a worker processes a heavy shuffle workload, its Netty direct memory metric (usedDirectMemory) spikes to a high value and then gets stuck and never recovers — even after the all flush ends and the worker becomes completely idle. The memory stays stuck until the worker is restarted.
Although i noticed that there are fixes which tries to handle this by using pinnedMemory – #3018 and #3099 but i think that this PR is the correct way to handle this instead of relying on pinnedMemory.
Why are the changes needed?
PooledByteBufAllocatormaintains a per-threadPoolThreadCachefor each Flusher thread. When a ByteBuf is released, instead of immediately returning its slot to thePoolArena, it goes into this thread-local cache. But the PoolArena's view of the underlyingPoolChunkis not updated.usedDirectMemorycounts allPoolChunknative memory regardless of how much is truly in use vs. sitting in thread caches.The thread cache is only swept when the owning thread crosses a allocation threshold, but since the worker is in pause state after all flush items end, all flush thread gets blocked on workingQueue(index).take(). The cache is never swept,
PoolChunksin thread cache are never freed, and usedDirectMemory stays frozen.In this PR, we are changing take() with poll() and on poll() timeout we are explicitly calling
allocator.trimCurrentThreadCache()to flush cachedPoolChunksback to thePoolArena, allowing fully-free chunks to be destroyed and usedDirectMemory to recover.A similar pattern is already used by:
https://github.com/apache/celeborn/blob/main/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java#L131
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?