From 922eba88537dc8810045345e14cc027181a5b3ef Mon Sep 17 00:00:00 2001 From: ShlomiTubul Date: Sun, 7 Jun 2026 11:52:08 +0300 Subject: [PATCH 1/2] [CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout When `celeborn.worker.commitFiles.timeout` fires and `future.cancel(true)` / `task.cancel(true)` interrupt the per-partition commit tasks, the worker's error-path response had three problems that amplified data loss: 1. The response was built with empty committed lists, discarding all partitions that committed before the timer fired. 2. `context.reply()` was never called, so the driver waited out `celeborn.client.rpc.commitFiles.askTimeout` instead of getting the verdict. 3. The `COMMIT_FILES_TIME` timer was never stopped on this path (leak). This builds the response from the actual committed / empty / failed state and returns `PARTIAL_SUCCESS` when any partition committed. Crucially, tasks still queued (or interrupted before reaching a terminal state) when cancellation fires land in NONE of the committed / empty / failed sets, so the failed list is computed as `requested - committed - empty` rather than only the explicitly-failed ids. Otherwise the driver's `CommitHandler.checkDataLost` cannot distinguish an in-flight (has data, uncommitted) partition from an empty (no data) one -- both are absent from committed and failed -- and would silently treat it as empty-and-valid, producing wrong reducer results with no `FetchFailedException`. Reporting them as failed makes the driver recompute them. The response construction is extracted into `Controller.buildCommitFilesResponseOnCancel` and covered by `ControllerSuite`. Co-Authored-By: Claude Opus 4.8 --- .../service/deploy/worker/Controller.scala | 68 ++++++++-- .../deploy/worker/ControllerSuite.scala | 118 ++++++++++++++++++ 2 files changed, 179 insertions(+), 7 deletions(-) create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 565acb44182..31d6856b8d5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -705,18 +705,25 @@ private[deploy] class Controller( case throwable: Throwable => logError(s"$errMsg, an unexpected exception occurred.", throwable) } + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds, + replicaIds, + committedPrimaryIds, + committedReplicaIds, + emptyFilePrimaryIds, + emptyFileReplicaIds, + committedPrimaryStorageInfos, + committedReplicaStorageInfos, + committedMapIdBitMap, + partitionSizeList) commitInfo.synchronized { - commitInfo.response = CommitFilesResponse( - StatusCode.COMMIT_FILE_EXCEPTION, - List.empty.asJava, - List.empty.asJava, - primaryIds, - replicaIds) - + commitInfo.response = response commitInfo.status = CommitInfo.COMMIT_FINISHED } + context.reply(response) workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT) + workerSource.stopTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey) } else { // finish, cancel timeout job first. timeout.cancel() @@ -898,3 +905,50 @@ private[deploy] class Controller( } } } + +private[deploy] object Controller { + + def buildCommitFilesResponseOnCancel( + primaryIds: jList[String], + replicaIds: jList[String], + committedPrimaryIds: jSet[String], + committedReplicaIds: jSet[String], + emptyFilePrimaryIds: jSet[String], + emptyFileReplicaIds: jSet[String], + committedPrimaryStorageInfos: java.util.Map[String, StorageInfo], + committedReplicaStorageInfos: java.util.Map[String, StorageInfo], + committedMapIdBitMap: java.util.Map[String, RoaringBitmap], + partitionSizeList: java.util.Collection[Long]): CommitFilesResponse = { + // Commit tasks may still be running here: future.cancel(true) does not interrupt a + // CompletableFuture. Compute failed = requested - committed - empty, reading committed + // before snapshotting it below. The sets are append-only, so a partition that commits in + // this window lands in both failed and committed (safe over-report), never in neither -- + // a partition in neither is read as empty-and-valid by the driver and silently dropped. + val failedPrimaryIds = new jArrayList[String](primaryIds) + failedPrimaryIds.removeAll(committedPrimaryIds) + failedPrimaryIds.removeAll(emptyFilePrimaryIds) + val failedReplicaIds = new jArrayList[String](replicaIds) + failedReplicaIds.removeAll(committedReplicaIds) + failedReplicaIds.removeAll(emptyFileReplicaIds) + // COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a + // successful terminal state and must not be reported as failed. + val status = + if (committedPrimaryIds.isEmpty && committedReplicaIds.isEmpty && + emptyFilePrimaryIds.isEmpty && emptyFileReplicaIds.isEmpty) { + StatusCode.COMMIT_FILE_EXCEPTION + } else { + StatusCode.PARTIAL_SUCCESS + } + CommitFilesResponse( + status, + new jArrayList[String](committedPrimaryIds), + new jArrayList[String](committedReplicaIds), + failedPrimaryIds, + failedReplicaIds, + new jHashMap[String, StorageInfo](committedPrimaryStorageInfos), + new jHashMap[String, StorageInfo](committedReplicaStorageInfos), + new jHashMap[String, RoaringBitmap](committedMapIdBitMap), + partitionSizeList.asScala.sum, + partitionSizeList.size()) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala new file mode 100644 index 00000000000..0bd5269c1a1 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker + +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} + +import scala.collection.JavaConverters._ + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.protocol.StorageInfo +import org.apache.celeborn.common.protocol.message.StatusCode + +class ControllerSuite extends AnyFunSuite { + + private def ids(xs: String*): java.util.List[String] = xs.toList.asJava + + private def set(xs: String*): java.util.Set[String] = { + val s = ConcurrentHashMap.newKeySet[String]() + xs.foreach(s.add) + s + } + + private def emptyStorageInfos = new ConcurrentHashMap[String, StorageInfo]() + private def emptyBitMaps = new ConcurrentHashMap[String, org.roaringbitmap.RoaringBitmap]() + private def emptySizes = new LinkedBlockingQueue[Long]() + + test("CELEBORN-2341: timeout response reports queued/in-flight partitions as failed " + + "so the driver recomputes them, while preserving committed partitions") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1", "p2", "p3", "p4"), + replicaIds = ids(), + committedPrimaryIds = set("p0", "p1"), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p2"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.PARTIAL_SUCCESS) + assert(response.committedPrimaryIds.asScala.toSet == Set("p0", "p1")) + assert(response.failedPrimaryIds.asScala.toSet == Set("p3", "p4")) + } + + test("CELEBORN-2341: timeout response with nothing committed stays COMMIT_FILE_EXCEPTION " + + "and reports all requested partitions as failed") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1"), + replicaIds = ids("r0"), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set(), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.COMMIT_FILE_EXCEPTION) + assert(response.failedPrimaryIds.asScala.toSet == Set("p0", "p1")) + assert(response.failedReplicaIds.asScala.toSet == Set("r0")) + } + + test("CELEBORN-2341: empty-file partitions are never reported as failed when nothing " + + "committed, so they are not needlessly recomputed") { + // p0/p1 closed empty, p2/p3 were still queued at cancellation, nothing committed. + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1", "p2", "p3"), + replicaIds = ids(), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p0", "p1"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.PARTIAL_SUCCESS) + assert(response.committedPrimaryIds.isEmpty) + assert(response.failedPrimaryIds.asScala.toSet == Set("p2", "p3")) + } + + test("CELEBORN-2341: all partitions empty (none committed, none in-flight) reports no " + + "failures, so the driver does not recompute") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1"), + replicaIds = ids(), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p0", "p1"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.failedPrimaryIds.isEmpty) + assert(response.failedReplicaIds.isEmpty) + } +} From 59c989fa7c958c6bcffb33fb27b688aff12b9902 Mon Sep 17 00:00:00 2001 From: shlomi tubul <33376277+shlomitubul@users.noreply.github.com> Date: Tue, 9 Jun 2026 14:42:46 +0300 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../celeborn/service/deploy/worker/Controller.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 31d6856b8d5..1f4dc863255 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -932,8 +932,12 @@ private[deploy] object Controller { failedReplicaIds.removeAll(emptyFileReplicaIds) // COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a // successful terminal state and must not be reported as failed. + val committedPrimaryIdList = new jArrayList[String](committedPrimaryIds) + val committedReplicaIdList = new jArrayList[String](committedReplicaIds) + // COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a + // successful terminal state and must not be reported as failed. val status = - if (committedPrimaryIds.isEmpty && committedReplicaIds.isEmpty && + if (committedPrimaryIdList.isEmpty && committedReplicaIdList.isEmpty && emptyFilePrimaryIds.isEmpty && emptyFileReplicaIds.isEmpty) { StatusCode.COMMIT_FILE_EXCEPTION } else { @@ -941,8 +945,8 @@ private[deploy] object Controller { } CommitFilesResponse( status, - new jArrayList[String](committedPrimaryIds), - new jArrayList[String](committedReplicaIds), + committedPrimaryIdList, + committedReplicaIdList, failedPrimaryIds, failedReplicaIds, new jHashMap[String, StorageInfo](committedPrimaryStorageInfos),