Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -705,18 +705,25 @@ private[deploy] class Controller(
case throwable: Throwable =>
logError(s"$errMsg, an unexpected exception occurred.", throwable)
}
val response = Controller.buildCommitFilesResponseOnCancel(
primaryIds,
replicaIds,
committedPrimaryIds,
committedReplicaIds,
emptyFilePrimaryIds,
emptyFileReplicaIds,
committedPrimaryStorageInfos,
committedReplicaStorageInfos,
committedMapIdBitMap,
partitionSizeList)
commitInfo.synchronized {
commitInfo.response = CommitFilesResponse(
StatusCode.COMMIT_FILE_EXCEPTION,
List.empty.asJava,
List.empty.asJava,
primaryIds,
replicaIds)

commitInfo.response = response
commitInfo.status = CommitInfo.COMMIT_FINISHED
}
context.reply(response)

workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT)
workerSource.stopTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
} else {
// finish, cancel timeout job first.
timeout.cancel()
Expand Down Expand Up @@ -898,3 +905,54 @@ private[deploy] class Controller(
}
}
}

private[deploy] object Controller {

def buildCommitFilesResponseOnCancel(
primaryIds: jList[String],
replicaIds: jList[String],
committedPrimaryIds: jSet[String],
committedReplicaIds: jSet[String],
emptyFilePrimaryIds: jSet[String],
emptyFileReplicaIds: jSet[String],
committedPrimaryStorageInfos: java.util.Map[String, StorageInfo],
committedReplicaStorageInfos: java.util.Map[String, StorageInfo],
committedMapIdBitMap: java.util.Map[String, RoaringBitmap],
partitionSizeList: java.util.Collection[Long]): CommitFilesResponse = {
// Commit tasks may still be running here: future.cancel(true) does not interrupt a
// CompletableFuture. Compute failed = requested - committed - empty, reading committed
// before snapshotting it below. The sets are append-only, so a partition that commits in
// this window lands in both failed and committed (safe over-report), never in neither --
// a partition in neither is read as empty-and-valid by the driver and silently dropped.
val failedPrimaryIds = new jArrayList[String](primaryIds)
failedPrimaryIds.removeAll(committedPrimaryIds)
failedPrimaryIds.removeAll(emptyFilePrimaryIds)
val failedReplicaIds = new jArrayList[String](replicaIds)
failedReplicaIds.removeAll(committedReplicaIds)
failedReplicaIds.removeAll(emptyFileReplicaIds)
// COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a
// successful terminal state and must not be reported as failed.
val committedPrimaryIdList = new jArrayList[String](committedPrimaryIds)
val committedReplicaIdList = new jArrayList[String](committedReplicaIds)
// COMMIT_FILE_EXCEPTION only when nothing committed and nothing empty; empty files are a
// successful terminal state and must not be reported as failed.
val status =
if (committedPrimaryIdList.isEmpty && committedReplicaIdList.isEmpty &&
emptyFilePrimaryIds.isEmpty && emptyFileReplicaIds.isEmpty) {
StatusCode.COMMIT_FILE_EXCEPTION
} else {
StatusCode.PARTIAL_SUCCESS
}
CommitFilesResponse(
status,
committedPrimaryIdList,
committedReplicaIdList,
failedPrimaryIds,
failedReplicaIds,
new jHashMap[String, StorageInfo](committedPrimaryStorageInfos),
new jHashMap[String, StorageInfo](committedReplicaStorageInfos),
new jHashMap[String, RoaringBitmap](committedMapIdBitMap),
partitionSizeList.asScala.sum,
partitionSizeList.size())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker

import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}

import scala.collection.JavaConverters._

import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.protocol.message.StatusCode

class ControllerSuite extends AnyFunSuite {

private def ids(xs: String*): java.util.List[String] = xs.toList.asJava

private def set(xs: String*): java.util.Set[String] = {
val s = ConcurrentHashMap.newKeySet[String]()
xs.foreach(s.add)
s
}

private def emptyStorageInfos = new ConcurrentHashMap[String, StorageInfo]()
private def emptyBitMaps = new ConcurrentHashMap[String, org.roaringbitmap.RoaringBitmap]()
private def emptySizes = new LinkedBlockingQueue[Long]()

test("CELEBORN-2341: timeout response reports queued/in-flight partitions as failed " +
"so the driver recomputes them, while preserving committed partitions") {
val response = Controller.buildCommitFilesResponseOnCancel(
primaryIds = ids("p0", "p1", "p2", "p3", "p4"),
replicaIds = ids(),
committedPrimaryIds = set("p0", "p1"),
committedReplicaIds = set(),
emptyFilePrimaryIds = set("p2"),
emptyFileReplicaIds = set(),
committedPrimaryStorageInfos = emptyStorageInfos,
committedReplicaStorageInfos = emptyStorageInfos,
committedMapIdBitMap = emptyBitMaps,
partitionSizeList = emptySizes)

assert(response.status == StatusCode.PARTIAL_SUCCESS)
assert(response.committedPrimaryIds.asScala.toSet == Set("p0", "p1"))
assert(response.failedPrimaryIds.asScala.toSet == Set("p3", "p4"))
}

test("CELEBORN-2341: timeout response with nothing committed stays COMMIT_FILE_EXCEPTION " +
"and reports all requested partitions as failed") {
val response = Controller.buildCommitFilesResponseOnCancel(
primaryIds = ids("p0", "p1"),
replicaIds = ids("r0"),
committedPrimaryIds = set(),
committedReplicaIds = set(),
emptyFilePrimaryIds = set(),
emptyFileReplicaIds = set(),
committedPrimaryStorageInfos = emptyStorageInfos,
committedReplicaStorageInfos = emptyStorageInfos,
committedMapIdBitMap = emptyBitMaps,
partitionSizeList = emptySizes)

assert(response.status == StatusCode.COMMIT_FILE_EXCEPTION)
assert(response.failedPrimaryIds.asScala.toSet == Set("p0", "p1"))
assert(response.failedReplicaIds.asScala.toSet == Set("r0"))
}

test("CELEBORN-2341: empty-file partitions are never reported as failed when nothing " +
"committed, so they are not needlessly recomputed") {
// p0/p1 closed empty, p2/p3 were still queued at cancellation, nothing committed.
val response = Controller.buildCommitFilesResponseOnCancel(
primaryIds = ids("p0", "p1", "p2", "p3"),
replicaIds = ids(),
committedPrimaryIds = set(),
committedReplicaIds = set(),
emptyFilePrimaryIds = set("p0", "p1"),
emptyFileReplicaIds = set(),
committedPrimaryStorageInfos = emptyStorageInfos,
committedReplicaStorageInfos = emptyStorageInfos,
committedMapIdBitMap = emptyBitMaps,
partitionSizeList = emptySizes)

assert(response.status == StatusCode.PARTIAL_SUCCESS)
assert(response.committedPrimaryIds.isEmpty)
assert(response.failedPrimaryIds.asScala.toSet == Set("p2", "p3"))
}

test("CELEBORN-2341: all partitions empty (none committed, none in-flight) reports no " +
"failures, so the driver does not recompute") {
val response = Controller.buildCommitFilesResponseOnCancel(
primaryIds = ids("p0", "p1"),
replicaIds = ids(),
committedPrimaryIds = set(),
committedReplicaIds = set(),
emptyFilePrimaryIds = set("p0", "p1"),
emptyFileReplicaIds = set(),
committedPrimaryStorageInfos = emptyStorageInfos,
committedReplicaStorageInfos = emptyStorageInfos,
committedMapIdBitMap = emptyBitMaps,
partitionSizeList = emptySizes)

assert(response.failedPrimaryIds.isEmpty)
assert(response.failedReplicaIds.isEmpty)
}
}
Loading