[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3706
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## branch-0.6 #3706 +/- ##
==============================================
+ Coverage 66.77% 67.01% +0.24%
==============================================
Files 354 354
Lines 21565 21652 +87
Branches 1912 1911 -1
==============================================
+ Hits 14397 14507 +110
+ Misses 6155 6132 -23
Partials 1013 1013 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
When the worker-side `celeborn.worker.commitFiles.timeout` fires and `future.cancel(true)` interrupts the per-partition commit tasks, Controller's BiFunction does two things that amplify data loss unnecessarily: 1. The response is built with `List.empty.asJava` for both committed primary and replica id lists, even though `committedPrimaryIds` / `committedReplicaIds` may be partially populated from tasks that finished before the timer fired. 2. `context.reply()` is never called on the error path, so the driver waits the full `celeborn.client.rpc.commitFiles.askTimeout` for a reply that never comes — even though the worker has already determined the outcome. This change preserves the SUCCESSFUL work and replies promptly: - If at least one partition committed before cancellation, the response uses `StatusCode.PARTIAL_SUCCESS` with the actual `committedPrimaryIds` / `committedReplicaIds` and accompanying storage / map-id-bitmap / partition-size data. The driver's `CommitHandler` retry loop already treats `PARTIAL_SUCCESS` as a terminal, non-retry status (same as `SUCCESS`) and processes the committed/failed partition lists; no driver-side change is needed. - If nothing committed (the catastrophic case), the legacy `COMMIT_FILE_EXCEPTION` response is preserved unchanged. - `context.reply(response)` now fires immediately on the error path so the driver's RPC ask completes with the worker's verdict instead of hitting `RpcTimeoutException` at `commitFiles.askTimeout`. Impact on the SHUFFLE_DATA_LOST -> FetchFailedException stage-retry path: when a worker hits its commit timeout under heavy load, the shuffle's reducer tasks can still fetch the partitions that did make it. Only the genuinely-failed partitions trigger data-lost handling, dramatically reducing the blast radius of any single slow commit.
cb74c9b to
27e9fd4
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves worker-side CommitFiles handling when celeborn.worker.commitFiles.timeout triggers cancellation, aiming to reduce unnecessary shuffle-wide data loss by returning a timely RPC reply that preserves any partitions that successfully committed before cancellation.
Changes:
- Build the error-path
CommitFilesResponsefrom the actualcommitted*/failed*/committed*StorageInfos/ bitmap / size state instead of always returning empty committed lists. - Reply to the driver immediately on the exceptional/cancel path (instead of letting the client ask time out).
- Stop the
COMMIT_FILES_TIMEtimer on the exceptional/cancel path.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val response = | ||
| if (committedPrimaryIds.isEmpty && committedReplicaIds.isEmpty) { | ||
| CommitFilesResponse( | ||
| StatusCode.COMMIT_FILE_EXCEPTION, | ||
| List.empty.asJava, | ||
| List.empty.asJava, | ||
| primaryIds, | ||
| replicaIds) | ||
| } else { | ||
| CommitFilesResponse( | ||
| StatusCode.PARTIAL_SUCCESS, | ||
| new jArrayList[String](committedPrimaryIds), | ||
| new jArrayList[String](committedReplicaIds), | ||
| new jArrayList[String](failedPrimaryIds), | ||
| new jArrayList[String](failedReplicaIds), | ||
| new jHashMap[String, StorageInfo](committedPrimaryStorageInfos), | ||
| new jHashMap[String, StorageInfo](committedReplicaStorageInfos), | ||
| new jHashMap[String, RoaringBitmap](committedMapIdBitMap), | ||
| partitionSizeList.asScala.sum, | ||
| partitionSizeList.size()) | ||
| } |
SteNicholas
left a comment
There was a problem hiding this comment.
@shlomitubul, could you please create a new pull request which target branch is main?
SteNicholas
left a comment
There was a problem hiding this comment.
Thanks for digging into this — the motivation is solid, and two of the three changes are clean wins. But I think the core "report partial success" change drops in-flight partitions in a way the driver can't detect, so I'd like to hold on this one.
Blocking: in-flight partitions become silent data loss
On the worker each commit task reaches a terminal state by adding its id to exactly one of committedIds / emptyFileIds / failedIds, and committedIds.add happens after fileWriter.close() — so the ordering correctly rules out reporting a non-durable commit. Good.
But when the timeout calls future.cancel(true) + task.cancel(true), tasks that were still queued / interrupted before reaching a terminal state land in none of those sets. This PR's PARTIAL_SUCCESS reports committed = real committed and failed = only the explicitly-failed ids, so the in-flight partitions are in neither list. Following it to the driver:
processResponserecords only committed and failed ids.checkDataLostkeys only off the failed sets (!pushReplicateEnabled && failedPrimaries.nonEmpty, or failed-on-both-replicas); it never compares against the requested set.- A normal successful commit already reports empty-file partitions in neither list, and the driver treats "absent from committed" as valid-but-empty.
So the driver can't distinguish an in-flight (uncommitted, has data) partition from an empty (no data) one — both are absent from committed and from failed. The in-flight partition is silently treated as empty: missing from the reducer file group, not flagged by checkDataLost, no recompute → the reducer silently produces wrong results, not even a FetchFailure.
That's worse than today's behavior in the dangerous direction: the current COMMIT_FILE_EXCEPTION reports all ids as failed (over-reporting → whole-shuffle recompute — wasteful but safe). And it bites exactly the target scenario: a heavy shuffle whose partition count exceeds the timeout will have many queued-but-unstarted tasks when the timer fires.
Suggested fix: in the PARTIAL_SUCCESS branch, compute the failed lists as requested − committed − emptyFile rather than just the explicitly-failed set, e.g. failedPrimaryIds = primaryIds \ committedPrimaryIds \ emptyFilePrimaryIds (same for replica). That keeps the real improvement (committed partitions preserved) while ensuring every not-actually-committed partition is reported as failed so checkDataLost recomputes it. The worker has to do this — the driver can't, since it can't tell empty from in-flight without help.
The other two changes look good
context.reply(response)on the error path is a real fix — the original error branch never replied, so the driver waited outcommitFiles.askTimeout. I checked the dedup flow (COMMIT_FINISHED→reply /COMMIT_INPROCESS): the future handler replies to the original context exactly once and duplicate RPCs get their own context, so no double-reply.workerSource.stopTimer(...)fixes a latent timer leak on the error path, and is mutually exclusive with the successreply(), so no double-stop.
Secondary
- The response is snapshotted while interrupted-but-not-stopped tasks may still mutate the sets. The commit-then-add ordering prevents the dangerous case, but cross-collection snapshots aren't atomic (an id can be in
committedIdsbefore itscommittedStorageInfosentry is copied). Benign — the driver just retries that one — but worth a comment. - Given the severity, this needs a test: simulate a timeout with some partitions committed and some still queued, and assert the response marks the uncommitted ones as failed so the driver recomputes them.
With the failed-set computation changed to requested − committed − empty plus a test, I think this becomes a solid improvement. Happy to help with the test wiring if useful.
|
Thanks @SteNicholas — your blocking analysis was exactly right. Tracing it through I've opened #3721 targeting
Closing this one in favor of #3721. Thanks again for the careful review. |
What changes were proposed in this pull request?
When the worker-side
celeborn.worker.commitFiles.timeoutfires andfuture.cancel(true)interrupts the per-partition commit tasks,Controller's BiFunction has two issues that amplify data loss unnecessarily:The response is built with
List.empty.asJavafor bothcommittedPrimaryIdsandcommittedReplicaIds, even though those concurrent sets may have been partially populated by tasks that finished before the timer fired. All successful commit work is silently thrown away.context.reply()is never called on the error path, so the originating commit RPC sits unanswered until the driver'sceleborn.client.rpc.commitFiles.askTimeoutexpires. The worker has already determined the outcome — there's no reason to make the driver wait.This PR:
committedPrimaryIds/committedReplicaIds/failedPrimaryIds/failedReplicaIds/committedPrimaryStorageInfos/committedReplicaStorageInfos/committedMapIdBitMap/partitionSizeList.StatusCode.PARTIAL_SUCCESSwhen any partition committed before cancellation, with the populated lists.CommitHandleron the client already treatsPARTIAL_SUCCESSas a terminal, non-retry status (alongsideSUCCESS,SHUFFLE_UNREGISTERED,REQUEST_FAILED,WORKER_EXCLUDED,COMMIT_FILE_EXCEPTION), so no client-side change is required.StatusCode.COMMIT_FILE_EXCEPTIONresponse when nothing committed.context.reply(response)so the driver's RPC ask receives the verdict immediately instead of timing out.Why are the changes needed?
In production we hit
celeborn.worker.commitFiles.timeoutperiodically on heavy shuffles where the per-worker partition count makes the close() work exceed the timeout. When that happens today:commitFiles.askTimeout, loggingCannot receive any reply ... in 300000 milliseconds.dataLostShuffleSet.add(shuffleId), even when most partitions succeeded.SHUFFLE_DATA_LOST→FetchFailedException→ DAGScheduler retries the whole map stage.With this change, the driver receives a definitive
PARTIAL_SUCCESSreply with the actual committed/failed split. The data-lost set is populated based on the partitions that genuinely didn't make it. Reducers for the partitions that did commit can fetch them normally.Does this PR introduce any user-facing change?
No client API or wire format changes.
StatusCode.PARTIAL_SUCCESSis already part of theCommitFilesResponseprotocol and is already handled by the driver-sideCommitHandler.parallelCommitFilesretry loop. The user-visible effect is reduced blast radius when a worker's commit times out under heavy load.How was this patch tested?
./build/mvn -DskipTests installbuilds the full tree cleanly onmain(verified locally with Java 17 / Maven 3.9); the file is byte-identical betweenmainandbranch-0.6, so the same patch applies and compiles on both.COMMIT_FILE_EXCEPTIONpath byte-for-byte for the "nothing committed" case; the new branch reuses the exact same construction pattern as the success-pathreply()method (committedPrimaryStorageAndDiskHintList,committedReplicaStorageAndDiskHintList,committedMapIdBitMapList, etc.).jArrayList,jHashMap,RoaringBitmap,StorageInfo) are already in scope at the top ofController.scala.