Skip to content

[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721

Open
shlomitubul wants to merge 2 commits into
apache:mainfrom
shlomitubul:worker-preserve-partial-commit-on-timeout-main
Open

[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721
shlomitubul wants to merge 2 commits into
apache:mainfrom
shlomitubul:worker-preserve-partial-commit-on-timeout-main

Conversation

@shlomitubul

@shlomitubul shlomitubul commented Jun 7, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This targets main and supersedes #3706 (which targeted branch-0.6).

When celeborn.worker.commitFiles.timeout fires, Controller cancels the per-partition commit tasks (future.cancel(true) / task.cancel(true)) and runs the handleAsync error branch. Today that branch:

  1. never calls context.reply() — the originating CommitFiles RPC is left unanswered. The driver only learns the outcome after its ask times out (celeborn.client.rpc.commitFiles.askTimeout, which falls back to celeborn.rpc.askTimeout, default 60s; often raised to 240s+), after which it re-asks with the same commit epoch and the worker's dedup returns the cached response;
  2. reports empty committed lists (COMMIT_FILE_EXCEPTION with all requested ids failed), discarding partitions that finished closing before the timer fired;
  3. never stops the COMMIT_FILES_TIME timer (latent leak).

This PR makes the cancel/timeout branch:

  • context.reply(response) immediately, so the driver doesn't wait out the ask timeout;
  • workerSource.stopTimer(COMMIT_FILES_TIME, shuffleKey);
  • build the response from the actual commit state via a new pure helper Controller.buildCommitFilesResponseOnCancelPARTIAL_SUCCESS with the partitions that committed when any did, and the unchanged COMMIT_FILE_EXCEPTION when none did.

In the PARTIAL_SUCCESS branch the failed lists are computed as requested − committed − empty, not just the explicitly-failed ids (see correctness note below).

Why are the changes needed?

Primary — eliminate the driver stall. On every commit timeout the driver currently blocks for a full commitFiles.askTimeout before the same-epoch retry retrieves the worker's cached response. Replying immediately removes that wait (and the redundant retry round). The stopTimer call fixes a timer leak on the same path. These two are unconditional wins on every timeout.

Secondary — make the partial-success path safe. Tasks still queued (or interrupted before reaching a terminal state) when cancellation fires land in none of the worker's committed / empty / failed sets. If such a partition were reported as neither committed nor failed, the driver would record it nowhere; checkDataLost keys only off the failed sets, so it would not flag it; collectResult would omit it from the reducer file group; and on the read side CelebornShuffleReader treats a partition absent from the file group as empty-and-valid (it filters the requested range to partitionGroups.containsKey(p)), so reducers would silently produce wrong results with no FetchFailedException. Computing failed = requested − committed − empty guarantees every not-actually-committed partition is reported failed, routing it through checkDataLostSHUFFLE_DATA_LOST → stage recompute. This is required to return PARTIAL_SUCCESS without regressing into silent data loss.

Scope / honest limitations

Reduce-shuffle finalization is all-or-nothing: ReducePartitionCommitHandler.handleFinalCommitFiles calls collectResult (which populates the reducer file groups) only when checkDataLost returns false, and checkDataLost flags the whole shuffle if any primary failed (non-replicated) or any partition failed on both replicas. So this PR does not let reducers fetch the committed partitions while only the lost ones recompute — when a timeout leaves any partition uncommitted in a non-replicated shuffle, the whole map stage still recomputes, exactly as before. Preserving committed partitions avoids recompute only when the timeout left nothing actually uncommitted (e.g. the timer fired as the last file closed), and otherwise just keeps the worker's report truthful. The dependable, every-time benefits are the immediate reply and the timer fix.

Does this PR introduce any user-facing change?

No protocol or API change. StatusCode.PARTIAL_SUCCESS is already part of CommitFilesResponse and is already handled by the driver as a terminal, non-retry status (same branch as SUCCESS in CommitHandler.doParallelCommitFiles).

How was this patch tested?

New ControllerSuite unit tests for buildCommitFilesResponseOnCancel:

  • some committed, one empty, two still-queued/in-flight → PARTIAL_SUCCESS, committed preserved, the in-flight ids reported failed, the empty id not failed;
  • nothing committed → COMMIT_FILE_EXCEPTION with all requested ids failed.

Built and spotless:check-clean on worker with Java 17.

🤖 Generated with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the worker-side CommitFiles timeout/cancellation handling in Controller to (a) reply immediately instead of leaving the driver waiting for its ask timeout, (b) stop the COMMIT_FILES_TIME timer on the timeout path, and (c) preserve partially-completed commit state by building a response from the actual committed/empty sets.

Changes:

  • Reply immediately and stop the commit-files timer when the async commit future is cancelled/times out.
  • Introduce Controller.buildCommitFilesResponseOnCancel to compute a truthful CommitFilesResponse from the observed commit state (committed/empty vs failed).
  • Add ControllerSuite unit tests covering partial-success and all-failed cancel responses.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Replies on cancel/timeout, stops timer, and adds helper to build cancel/timeout response from commit state.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala Adds unit tests for the new cancel/timeout response builder.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I traced the full data path on both the driver and reader sides — this is a solid, well-reasoned change. The two unconditional wins (immediate reply, timer-leak fix) are clearly correct, and the riskier partial-success logic is safe.

The safety argument holds (verified)

The crucial claim is that an in-flight partition reported as neither committed nor failed causes silent data loss, so failed = requested − committed − empty is required. Every link checks out:

  • CommitHandler.checkDataLost keys only off the failed sets — a partition absent from failedPrimaryPartitionIds/failedReplicaPartitionIds is never flagged.
  • collectResult only populates the reducer file group from committedPrimaryIds.
  • CelebornShuffleReader (// filter empty partition.filter(p => fileGroups.partitionGroups.containsKey(p))) silently skips a partition absent from the file group — no FetchFailedException.

So an in-flight partition absent from both sets is silently dropped at read time. Computing failed = requested − committed − empty correctly routes it through checkDataLost → SHUFFLE_DATA_LOST → recompute. PARTIAL_SUCCESS is confirmed terminal/non-retry (same branch as SUCCESS in doParallelCommitFiles), and the "honest limitations" section matches the all-or-nothing behavior in handleFinalCommitFiles.

Concurrency nuance worth a code comment

task.cancel(true) / future.cancel(true) on a CompletableFuture does not interrupt the running task — mayInterruptIfRunning is ignored by CompletableFuture. So when the timer fires, the per-partition commit tasks keep running in commitThreadPool while the error branch snapshots the committed/empty sets. I worked through the race and it's safe: failedPrimaryIds is computed from the live committedPrimaryIds before the committed-list snapshot is taken, and the committed set only grows, so a partition that commits during the window lands in both lists (over-reports failure → safe recompute) — never in "neither". And a partition reaches committedIds only after fileWriter.close() succeeds, so committed ⇒ durable.

This is correct but non-obvious and the ordering is load-bearing. A one-line comment noting the snapshot is best-effort (tasks may still be running since cancel doesn't interrupt) would protect against a future reorder.

Empty-partition edge case (same as Copilot's note)

The COMMIT_FILE_EXCEPTION branch triggers on committed*Ids.isEmpty, which marks genuinely-empty partitions (tracked in emptyFile*Ids) as failed → unnecessary recompute in the rare "all partitions empty, none committed" case. Not a correctness bug — over-reporting failure is always safe, and it byte-for-byte preserves the prior no-commit response. If you'd rather optimize it, gate on committed.isEmpty && empty.isEmpty; otherwise a quick reply on the Copilot thread noting it's an intentional conservative choice would close it out.

Minor

  • Tests cover only the pure helper (the risky part — acceptable), but not the empty-only edge case above, which is exactly the Copilot scenario — worth adding.

None of this blocks merge. Nice work, and the writeup/limitations section is appreciated.

🤖 Generated with Claude Code

…s timeout

When `celeborn.worker.commitFiles.timeout` fires and `future.cancel(true)` /
`task.cancel(true)` interrupt the per-partition commit tasks, the worker's
error-path response had three problems that amplified data loss:

1. The response was built with empty committed lists, discarding all partitions
   that committed before the timer fired.
2. `context.reply()` was never called, so the driver waited out
   `celeborn.client.rpc.commitFiles.askTimeout` instead of getting the verdict.
3. The `COMMIT_FILES_TIME` timer was never stopped on this path (leak).

This builds the response from the actual committed / empty / failed state and
returns `PARTIAL_SUCCESS` when any partition committed.

Crucially, tasks still queued (or interrupted before reaching a terminal state)
when cancellation fires land in NONE of the committed / empty / failed sets, so
the failed list is computed as `requested - committed - empty` rather than only
the explicitly-failed ids. Otherwise the driver's `CommitHandler.checkDataLost`
cannot distinguish an in-flight (has data, uncommitted) partition from an empty
(no data) one -- both are absent from committed and failed -- and would silently
treat it as empty-and-valid, producing wrong reducer results with no
`FetchFailedException`. Reporting them as failed makes the driver recompute them.

The response construction is extracted into `Controller.buildCommitFilesResponseOnCancel`
and covered by `ControllerSuite`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@shlomitubul shlomitubul force-pushed the worker-preserve-partial-commit-on-timeout-main branch from 9ae10f6 to 922eba8 Compare June 8, 2026 13:30
@shlomitubul

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough trace — addressed both points in the latest push (922eba8):

  1. Empty-partition edge case (also Copilot's note): the helper no longer takes a separate empty-committed branch. failed is always computed as requested − committed − empty, and COMMIT_FILE_EXCEPTION is returned only when nothing committed and nothing is empty; otherwise PARTIAL_SUCCESS. So genuinely-empty partitions are never reported as failed (consistent with the success path), and the "all-empty, none committed" case no longer forces a recompute. Added two unit tests: empty-files-not-failed-when-nothing-committed, and all-empty-reports-no-failures.

  2. Concurrency / cancel doesn't interrupt — added a comment at the snapshot making the ordering explicit: cancel(true) doesn't interrupt a running CompletableFuture, the committed/empty sets are append-only, and failed is computed (reading committed) before the committed snapshot — so a partition committing mid-window lands in both lists (safe over-report), never in neither. Calling out the load-bearing ordering so a future reorder doesn't reintroduce the silent-drop.

Appreciate the review.

🤖 Generated with Claude Code

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants