diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 565acb44182..1f4dc863255 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -705,18 +705,25 @@ private[deploy] class Controller( case throwable: Throwable => logError(s"$errMsg, an unexpected exception occurred.", throwable) } + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds, + replicaIds, + committedPrimaryIds, + committedReplicaIds, + emptyFilePrimaryIds, + emptyFileReplicaIds, + committedPrimaryStorageInfos, + committedReplicaStorageInfos, + committedMapIdBitMap, + partitionSizeList) commitInfo.synchronized { - commitInfo.response = CommitFilesResponse( - StatusCode.COMMIT_FILE_EXCEPTION, - List.empty.asJava, - List.empty.asJava, - primaryIds, - replicaIds) - + commitInfo.response = response commitInfo.status = CommitInfo.COMMIT_FINISHED } + context.reply(response) workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT) + workerSource.stopTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey) } else { // finish, cancel timeout job first. timeout.cancel() @@ -898,3 +905,54 @@ private[deploy] class Controller( } } } + +private[deploy] object Controller { + + def buildCommitFilesResponseOnCancel( + primaryIds: jList[String], + replicaIds: jList[String], + committedPrimaryIds: jSet[String], + committedReplicaIds: jSet[String], + emptyFilePrimaryIds: jSet[String], + emptyFileReplicaIds: jSet[String], + committedPrimaryStorageInfos: java.util.Map[String, StorageInfo], + committedReplicaStorageInfos: java.util.Map[String, StorageInfo], + committedMapIdBitMap: java.util.Map[String, RoaringBitmap], + partitionSizeList: java.util.Collection[Long]): CommitFilesResponse = { + // Commit tasks may still be running here: future.cancel(true) does not interrupt a + // CompletableFuture. Compute failed = requested - committed - empty, reading committed + // before snapshotting it below. The sets are append-only, so a partition that commits in + // this window lands in both failed and committed (safe over-report), never in neither -- + // a partition in neither is read as empty-and-valid by the driver and silently dropped. + val failedPrimaryIds = new jArrayList[String](primaryIds) + failedPrimaryIds.removeAll(committedPrimaryIds) + failedPrimaryIds.removeAll(emptyFilePrimaryIds) + val failedReplicaIds = new jArrayList[String](replicaIds) + failedReplicaIds.removeAll(committedReplicaIds) + failedReplicaIds.removeAll(emptyFileReplicaIds) + // COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a + // successful terminal state and must not be reported as failed. + val committedPrimaryIdList = new jArrayList[String](committedPrimaryIds) + val committedReplicaIdList = new jArrayList[String](committedReplicaIds) + // COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a + // successful terminal state and must not be reported as failed. + val status = + if (committedPrimaryIdList.isEmpty && committedReplicaIdList.isEmpty && + emptyFilePrimaryIds.isEmpty && emptyFileReplicaIds.isEmpty) { + StatusCode.COMMIT_FILE_EXCEPTION + } else { + StatusCode.PARTIAL_SUCCESS + } + CommitFilesResponse( + status, + committedPrimaryIdList, + committedReplicaIdList, + failedPrimaryIds, + failedReplicaIds, + new jHashMap[String, StorageInfo](committedPrimaryStorageInfos), + new jHashMap[String, StorageInfo](committedReplicaStorageInfos), + new jHashMap[String, RoaringBitmap](committedMapIdBitMap), + partitionSizeList.asScala.sum, + partitionSizeList.size()) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala new file mode 100644 index 00000000000..0bd5269c1a1 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker + +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} + +import scala.collection.JavaConverters._ + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.protocol.StorageInfo +import org.apache.celeborn.common.protocol.message.StatusCode + +class ControllerSuite extends AnyFunSuite { + + private def ids(xs: String*): java.util.List[String] = xs.toList.asJava + + private def set(xs: String*): java.util.Set[String] = { + val s = ConcurrentHashMap.newKeySet[String]() + xs.foreach(s.add) + s + } + + private def emptyStorageInfos = new ConcurrentHashMap[String, StorageInfo]() + private def emptyBitMaps = new ConcurrentHashMap[String, org.roaringbitmap.RoaringBitmap]() + private def emptySizes = new LinkedBlockingQueue[Long]() + + test("CELEBORN-2341: timeout response reports queued/in-flight partitions as failed " + + "so the driver recomputes them, while preserving committed partitions") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1", "p2", "p3", "p4"), + replicaIds = ids(), + committedPrimaryIds = set("p0", "p1"), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p2"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.PARTIAL_SUCCESS) + assert(response.committedPrimaryIds.asScala.toSet == Set("p0", "p1")) + assert(response.failedPrimaryIds.asScala.toSet == Set("p3", "p4")) + } + + test("CELEBORN-2341: timeout response with nothing committed stays COMMIT_FILE_EXCEPTION " + + "and reports all requested partitions as failed") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1"), + replicaIds = ids("r0"), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set(), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.COMMIT_FILE_EXCEPTION) + assert(response.failedPrimaryIds.asScala.toSet == Set("p0", "p1")) + assert(response.failedReplicaIds.asScala.toSet == Set("r0")) + } + + test("CELEBORN-2341: empty-file partitions are never reported as failed when nothing " + + "committed, so they are not needlessly recomputed") { + // p0/p1 closed empty, p2/p3 were still queued at cancellation, nothing committed. + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1", "p2", "p3"), + replicaIds = ids(), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p0", "p1"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.status == StatusCode.PARTIAL_SUCCESS) + assert(response.committedPrimaryIds.isEmpty) + assert(response.failedPrimaryIds.asScala.toSet == Set("p2", "p3")) + } + + test("CELEBORN-2341: all partitions empty (none committed, none in-flight) reports no " + + "failures, so the driver does not recompute") { + val response = Controller.buildCommitFilesResponseOnCancel( + primaryIds = ids("p0", "p1"), + replicaIds = ids(), + committedPrimaryIds = set(), + committedReplicaIds = set(), + emptyFilePrimaryIds = set("p0", "p1"), + emptyFileReplicaIds = set(), + committedPrimaryStorageInfos = emptyStorageInfos, + committedReplicaStorageInfos = emptyStorageInfos, + committedMapIdBitMap = emptyBitMaps, + partitionSizeList = emptySizes) + + assert(response.failedPrimaryIds.isEmpty) + assert(response.failedReplicaIds.isEmpty) + } +}