From 0e608b5394e0bcfe2a469c82fdc255cd964c7a6e Mon Sep 17 00:00:00 2001 From: Rob Amos Date: Fri, 8 May 2026 18:49:50 +1000 Subject: [PATCH 1/2] Added failing test to reproduce deadlock Co-Authored-By: Galen Quinn --- .../Utilities/AsyncCurrentValueTests.swift | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 Tests/VexilTests/Utilities/AsyncCurrentValueTests.swift diff --git a/Tests/VexilTests/Utilities/AsyncCurrentValueTests.swift b/Tests/VexilTests/Utilities/AsyncCurrentValueTests.swift new file mode 100644 index 0000000..ee8a9cc --- /dev/null +++ b/Tests/VexilTests/Utilities/AsyncCurrentValueTests.swift @@ -0,0 +1,71 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Vexil open source project +// +// Copyright (c) 2026 Unsigned Apps and the open source contributors. +// Licensed under the MIT license +// +// See LICENSE for license information +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +import Foundation +import Testing +@testable import Vexil + +#if os(macOS) + +struct AsyncCurrentValueTests { + /// Regression test for a lock-order inversion deadlock between two threads: + /// + /// Thread A (update): holds allocation.mutex → calls continuation.resume() inside didSet + /// → resume() internally acquires the Swift task status lock + /// + /// Thread B (cancel): acquires the Swift task status lock → fires onCancel: handler + /// → onCancel: calls allocation.mutex.withLock → waits for mutex + /// + /// Thread A holds mutex, wants task-lock. + /// Thread B holds task-lock, wants mutex. + /// → deadlock. + /// + /// If the bug is present, the test will time out after 1 minute. + @Test(.timeLimit(.minutes(1))) + func `AsyncCurrentValue does not deadlock when cancellation races a concurrent update`() async { + for _ in 0 ..< 100_000 { + let currentValue = AsyncCurrentValue(.all) + + // This task will: + // 1. Call next() once — returns the initial value immediately because + // iterator.generation (0) < state.generation (1). + // 2. Call next() again — suspends, storing its continuation in + // pendingContinuations. This is the continuation that gets raced. + let consumingTask = Task { + var iterator = currentValue.stream.makeAsyncIterator() + _ = await iterator.next(isolation: nil) + _ = await iterator.next(isolation: nil) + } + + // Yield to give the consuming task time to advance past the first next() + // and park its continuation inside pendingContinuations on the second call. + await Task.yield() + await Task.yield() + await Task.yield() + + // Fire the two racing operations: + // updateTask — acquires allocation.mutex, sets wrappedValue, didSet calls + // continuation.resume() while still inside withLock. + // cancel — acquires the task status lock, fires onCancel:, which calls + // allocation.mutex.withLock. + let updateTask = Task.detached { currentValue.update { _ in } } + consumingTask.cancel() + + await updateTask.value + await consumingTask.value + } + } + +} + +#endif From 8a7dd58dfca8652bfd140edef03174985b42a03b Mon Sep 17 00:00:00 2001 From: Rob Amos Date: Fri, 8 May 2026 18:50:10 +1000 Subject: [PATCH 2/2] Moved resumption of pending continuations to outside the mutex. --- .../Vexil/Utilities/AsyncCurrentValue.swift | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/Sources/Vexil/Utilities/AsyncCurrentValue.swift b/Sources/Vexil/Utilities/AsyncCurrentValue.swift index 5d5a506..9832989 100644 --- a/Sources/Vexil/Utilities/AsyncCurrentValue.swift +++ b/Sources/Vexil/Utilities/AsyncCurrentValue.swift @@ -19,16 +19,7 @@ struct AsyncCurrentValue { // iterators start with generation = 0, so our initial value // has generation 1, so even that will be delivered. var generation = 1 - var wrappedValue: Wrapped { - didSet { - generation += 1 - for (_, continuation) in pendingContinuations { - continuation.resume(returning: (generation, wrappedValue)) - } - pendingContinuations = [] - } - } - + var wrappedValue: Wrapped var pendingContinuations = [(UUID, CheckedContinuation<(Int, Wrapped)?, Never>)]() } @@ -68,18 +59,39 @@ struct AsyncCurrentValue { /// - body: A closure that passes the current value as an in-out parameter that you can mutate. /// When the closure returns the mutated value is saved as the current value and is sent to all subscribers. /// - func update(_ body: (inout sending Wrapped) throws -> R) rethrows -> R { - try allocation.mutex.withLock { state in + func update(_ body: (inout sending Wrapped) throws(E) -> R) throws(E) -> R { + let result: Result + let generation: Int + let pendingContinuations: [CheckedContinuation<(Int, Wrapped)?, Never>] + let updatedValue: Wrapped + + // If we resume continuations within the context of this lock we risk a deadlock + // as they attempt to access the next value. So we do the update and return + // pending continuations to be resumed outside the lock. It should be impossible + // for new continuations to miss this generation as they're accessed and added + // within the same lock closure. + + (result, updatedValue, generation, pendingContinuations) = allocation.mutex.withLock { state in + + // The closure mutates a copy, then we save that back to our state var wrappedValue = state.wrappedValue - do { - let result = try body(&wrappedValue) - state.wrappedValue = wrappedValue - return result - } catch { - state.wrappedValue = wrappedValue - throw error + let result = Result { () throws(E) -> R in + try body(&wrappedValue) } + state.wrappedValue = wrappedValue + + // Bump generation and grab pending continuations + state.generation += 1 + let toResume = state.pendingContinuations.map(\.1) + state.pendingContinuations = [] + return (result, wrappedValue, state.generation, toResume) + } + + // Resume our pending continuations + for continuation in pendingContinuations { + continuation.resume(returning: (generation, updatedValue)) } + return try result.get() } }