From 3899792d50f41b1653aeef10ba0177b8b5730188 Mon Sep 17 00:00:00 2001 From: Soumya Ranjan Mahunt Date: Tue, 2 Aug 2022 09:13:08 +0530 Subject: [PATCH] refactor: use `CheckedContinuation` for debug mode or for `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag --- Sources/AsyncObjects/AsyncEvent.swift | 4 +- Sources/AsyncObjects/AsyncSemaphore.swift | 4 +- .../AsyncObjects/ContinuationWrapper.swift | 104 +++++++++++++++--- Sources/AsyncObjects/TaskOperation.swift | 4 +- Sources/AsyncObjects/TaskQueue.swift | 72 ++++-------- Tests/AsyncObjectsTests/TaskQueueTests.swift | 42 ++++--- 6 files changed, 135 insertions(+), 95 deletions(-) diff --git a/Sources/AsyncObjects/AsyncEvent.swift b/Sources/AsyncObjects/AsyncEvent.swift index 6bb21a81..b6f8a273 100644 --- a/Sources/AsyncObjects/AsyncEvent.swift +++ b/Sources/AsyncObjects/AsyncEvent.swift @@ -8,7 +8,7 @@ import Foundation /// Wait for event signal by calling ``wait()`` method or its timeout variation ``wait(forNanoseconds:)``. public actor AsyncEvent: AsyncObject { /// The suspended tasks continuation type. - private typealias Continuation = UnsafeContinuation + private typealias Continuation = GlobalContinuation /// The continuations stored with an associated key for all the suspended task that are waitig for event signal. private var continuations: [UUID: Continuation] = [:] /// Indicates whether current stateof event is signaled. @@ -74,7 +74,7 @@ public actor AsyncEvent: AsyncObject { public func wait() async { guard !signaled else { return } let key = UUID() - try? await withUnsafeThrowingContinuationCancellationHandler( + try? await withThrowingContinuationCancellationHandler( handler: { [weak self] continuation in Task { [weak self] in await self?.removeContinuation(withKey: key) diff --git a/Sources/AsyncObjects/AsyncSemaphore.swift b/Sources/AsyncObjects/AsyncSemaphore.swift index c74685e3..142aee00 100644 --- a/Sources/AsyncObjects/AsyncSemaphore.swift +++ b/Sources/AsyncObjects/AsyncSemaphore.swift @@ -11,7 +11,7 @@ import OrderedCollections /// or its timeout variation ``wait(forNanoseconds:)``. public actor AsyncSemaphore: AsyncObject { /// The suspended tasks continuation type. - private typealias Continuation = UnsafeContinuation + private typealias Continuation = GlobalContinuation /// The continuations stored with an associated key for all the suspended task that are waitig for access to resource. private var continuations: OrderedDictionary = [:] /// Pool size for concurrent resource access. @@ -89,7 +89,7 @@ public actor AsyncSemaphore: AsyncObject { count -= 1 if count > 0 { return } let key = UUID() - try? await withUnsafeThrowingContinuationCancellationHandler( + try? await withThrowingContinuationCancellationHandler( handler: { [weak self] continuation in Task { [weak self] in await self?.removeContinuation(withKey: key) diff --git a/Sources/AsyncObjects/ContinuationWrapper.swift b/Sources/AsyncObjects/ContinuationWrapper.swift index cd6dba04..c81147c6 100644 --- a/Sources/AsyncObjects/ContinuationWrapper.swift +++ b/Sources/AsyncObjects/ContinuationWrapper.swift @@ -1,7 +1,7 @@ -/// Suspends the current task, then calls the given closure with an unsafe throwing continuation for the current task. +/// Suspends the current task, then calls the given closure with a throwing continuation for the current task. /// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked. /// -/// This operation cooperatively checks for cancellation and reacting to it by cancelling the unsafe throwing continuation with an error +/// This operation cooperatively checks for cancellation and reacting to it by cancelling the throwing continuation with an error /// and the cancellation handler is always and immediately invoked after that. /// For example, even if the operation is running code that never checks for cancellation, /// a cancellation handler still runs and provides a chance to run some cleanup code. @@ -9,29 +9,27 @@ /// - Parameters: /// - handler: A closure that is called after cancelling continuation. /// You must not resume the continuation in closure. -/// - fn: A closure that takes an `UnsafeContinuation` parameter. +/// - fn: A closure that takes a throwing continuation parameter. /// You must resume the continuation exactly once. /// -/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. /// - Returns: The value passed to the continuation. +/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. /// /// - Important: The continuation provided in cancellation handler is already resumed with cancellation error. /// Trying to resume the continuation here will cause runtime error/unexpected behavior. -func withUnsafeThrowingContinuationCancellationHandler( - handler: @Sendable (UnsafeContinuation) -> Void, - _ fn: (UnsafeContinuation) -> Void -) async throws -> T { - typealias Continuation = UnsafeContinuation - let wrapper = ContinuationWrapper() +func withThrowingContinuationCancellationHandler( + handler: @Sendable (C) -> Void, + _ fn: (C) -> Void +) async throws -> C.Success where C.Failure == Error { + let wrapper = ContinuationWrapper() let value = try await withTaskCancellationHandler { guard let continuation = wrapper.value else { return } wrapper.cancel(withError: CancellationError()) handler(continuation) - } operation: { () -> T in - let value = try await withUnsafeThrowingContinuation { - (c: Continuation) in - wrapper.value = c - fn(c) + } operation: { () -> C.Success in + let value = try await C.with { continuation in + wrapper.value = continuation + fn(continuation) } return value } @@ -85,10 +83,80 @@ protocol Continuable: Sendable { func resume(with result: Result) } -extension UnsafeContinuation: Continuable {} - -extension UnsafeContinuation where E == Error { +extension Continuable where Failure == Error { /// Cancel continuation by resuming with cancellation error. @inlinable func cancel() { self.resume(throwing: CancellationError()) } } + +extension UnsafeContinuation: Continuable {} +extension CheckedContinuation: Continuable {} + +protocol ThrowingContinuable: Continuable { + /// The type of error to resume the continuation with in case of failure. + associatedtype Failure = Error + /// Suspends the current task, then calls the given closure + /// with a throwing continuation for the current task. + /// + /// The continuation can be resumed exactly once, + /// subsequent resumes have different behavior depending on type implemeting. + /// + /// - Parameter fn: A closure that takes the throwing continuation parameter. + /// You can resume the continuation exactly once. + /// + /// - Returns: The value passed to the continuation by the closure. + /// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. + @inlinable + static func with(_ fn: (Self) -> Void) async throws -> Success +} + +extension UnsafeContinuation: ThrowingContinuable where E == Error { + /// Suspends the current task, then calls the given closure + /// with an unsafe throwing continuation for the current task. + /// + /// The continuation must be resumed exactly once, subsequent resumes will cause runtime error. + /// Use `CheckedContinuation` to capture relevant data in case of runtime errors. + /// + /// - Parameter fn: A closure that takes an `UnsafeContinuation` parameter. + /// You must resume the continuation exactly once. + /// + /// - Returns: The value passed to the continuation by the closure. + /// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. + @inlinable + static func with(_ fn: (UnsafeContinuation) -> Void) async throws -> T + { + return try await withUnsafeThrowingContinuation(fn) + } +} + +extension CheckedContinuation: ThrowingContinuable where E == Error { + /// Suspends the current task, then calls the given closure + /// with a checked throwing continuation for the current task. + /// + /// The continuation must be resumed exactly once, subsequent resumes will cause runtime error. + /// `CheckedContinuation` logs messages proving additional info on these errors. + /// Once all errors resolved, use `UnsafeContinuation` in release mode to benefit improved performance + /// at the loss of additional runtime checks. + /// + /// - Parameter fn: A closure that takes a `CheckedContinuation` parameter. + /// You must resume the continuation exactly once. + /// + /// - Returns: The value passed to the continuation by the closure. + /// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. + @inlinable + static func with(_ body: (CheckedContinuation) -> Void) async throws + -> T + { + return try await withCheckedThrowingContinuation(body) + } +} + +#if DEBUG || ASYNCOBJECTS_USE_CHECKEDCONTINUATION +/// The continuation type used in package in `DEBUG` mode +/// or if `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag turned on. +typealias GlobalContinuation = CheckedContinuation +#else +/// The continuation type used in package in `RELEASE` mode +///and in absence of `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag. +typealias GlobalContinuation = UnsafeContinuation +#endif diff --git a/Sources/AsyncObjects/TaskOperation.swift b/Sources/AsyncObjects/TaskOperation.swift index 3b565d38..a4fb0380 100644 --- a/Sources/AsyncObjects/TaskOperation.swift +++ b/Sources/AsyncObjects/TaskOperation.swift @@ -164,7 +164,7 @@ public final class TaskOperation: Operation, AsyncObject, // MARK: AsyncObject Impl /// The suspended tasks continuation type. - private typealias Continuation = UnsafeContinuation + private typealias Continuation = GlobalContinuation /// The continuations stored with an associated key for all the suspended task that are waitig for opearation completion. private var continuations: [UUID: Continuation] = [:] @@ -205,7 +205,7 @@ public final class TaskOperation: Operation, AsyncObject, public func wait() async { guard !isFinished else { return } let key = UUID() - try? await withUnsafeThrowingContinuationCancellationHandler( + try? await withThrowingContinuationCancellationHandler( handler: { [weak self] (continuation: Continuation) in Task { [weak self] in self?.removeContinuation(withKey: key) diff --git a/Sources/AsyncObjects/TaskQueue.swift b/Sources/AsyncObjects/TaskQueue.swift index 409a6376..376e5645 100644 --- a/Sources/AsyncObjects/TaskQueue.swift +++ b/Sources/AsyncObjects/TaskQueue.swift @@ -147,7 +147,7 @@ public actor TaskQueue: AsyncObject { let key = UUID() do { - try await withOnceResumableThrowingContinuationCancellationHandler( + try await withThrowingContinuationCancellationHandler( handler: { [weak self] continuation in Task { [weak self] in await self?.dequeueContinuation(withKey: key) @@ -205,7 +205,7 @@ public actor TaskQueue: AsyncObject { guard barriered || !queue.isEmpty else { return await runTask() } let key = UUID() do { - try await withOnceResumableThrowingContinuationCancellationHandler( + try await withThrowingContinuationCancellationHandler( handler: { [weak self] continuation in Task { [weak self] in await self?.dequeueContinuation(withKey: key) @@ -237,49 +237,6 @@ public actor TaskQueue: AsyncObject { } } -/// Suspends the current task, then calls the given closure with a safe throwing continuation for the current task. -/// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked. -/// -/// This operation cooperatively checks for cancellation and reacting to it by cancelling the safe throwing continuation with an error -/// and the cancellation handler is always and immediately invoked after that. -/// For example, even if the operation is running code that never checks for cancellation, -/// a cancellation handler still runs and provides a chance to run some cleanup code. -/// -/// - Parameters: -/// - handler: A closure that is called after cancelling continuation. -/// Resuming the continuation in closure will not have any effect. -/// - fn: A closure that takes an `SafeContinuation` parameter. -/// Continuation can be resumed exactly once and subsequent resuming are ignored and has no effect.. -/// -/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. -/// - Returns: The value passed to the continuation. -/// -/// - Note: The continuation provided in cancellation handler is already resumed with cancellation error. -/// Trying to resume the continuation here will have no effect. -private func withOnceResumableThrowingContinuationCancellationHandler< - T: Sendable ->( - handler: @Sendable (SafeContinuation) -> Void, - _ fn: (SafeContinuation) -> Void -) async throws -> T { - typealias Continuation = SafeContinuation - let wrapper = ContinuationWrapper() - let value = try await withTaskCancellationHandler { - guard let continuation = wrapper.value else { return } - wrapper.cancel(withError: CancellationError()) - handler(continuation) - } operation: { () -> T in - let value = try await withUnsafeThrowingContinuation { - (c: UnsafeContinuation) in - let continuation = SafeContinuation(continuation: c) - wrapper.value = continuation - fn(continuation) - } - return value - } - return value -} - /// A mechanism to interface between synchronous and asynchronous code, /// ignoring correctness violations. /// @@ -302,7 +259,7 @@ struct SafeContinuation: Continuable { } /// The underlying continuation used. - private let wrappedValue: UnsafeContinuation + private let wrappedValue: GlobalContinuation /// Keeps track whether continuation can be resumed, /// to make sure continuation only resumes once. private let resumable: Flag = { @@ -318,9 +275,7 @@ struct SafeContinuation: Continuable { /// don’t use it outside of this object. /// /// - Returns: The newly created safe continuation. - init( - continuation: UnsafeContinuation - ) { + init(continuation: GlobalContinuation) { self.wrappedValue = continuation } @@ -381,3 +336,22 @@ struct SafeContinuation: Continuable { resumable.off() } } + +extension SafeContinuation: ThrowingContinuable where E == Error { + /// Suspends the current task, then calls the given closure + /// with a safe throwing continuation for the current task. + /// + /// The continuation can be resumed exactly once, subsequent resumes are ignored. + /// + /// - Parameter fn: A closure that takes the throwing continuation parameter. + /// You can resume the continuation exactly once. + /// + /// - Returns: The value passed to the continuation by the closure. + /// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error. + @inlinable + static func with(_ fn: (SafeContinuation) -> Void) async throws -> T { + return try await GlobalContinuation.with { continuation in + fn(.init(continuation: continuation)) + } + } +} diff --git a/Tests/AsyncObjectsTests/TaskQueueTests.swift b/Tests/AsyncObjectsTests/TaskQueueTests.swift index 4e8d5eb7..6a5b63ca 100644 --- a/Tests/AsyncObjectsTests/TaskQueueTests.swift +++ b/Tests/AsyncObjectsTests/TaskQueueTests.swift @@ -229,10 +229,10 @@ class TaskQueueTests: XCTestCase { } class SafeContinuationTests: XCTestCase { + typealias IntContinuation = GlobalContinuation - func testSafeContinuationMultipleResumeReturningValues() async { - let value = await withUnsafeContinuation { - (continuation: UnsafeContinuation) in + func testSafeContinuationMultipleResumeReturningValues() async throws { + let value = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation(continuation: continuation) safeContinuation.resume(returning: 5) safeContinuation.resume(returning: 10) @@ -243,8 +243,7 @@ class SafeContinuationTests: XCTestCase { func testSafeContinuationMultipleResumeReturningValueThrowingError() async throws { - let value = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let value = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation(continuation: continuation) safeContinuation.resume(returning: 5) safeContinuation.resume(throwing: CancellationError()) @@ -256,10 +255,10 @@ class SafeContinuationTests: XCTestCase { async throws { do { - let _ = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let _ = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation( - continuation: continuation) + continuation: continuation + ) safeContinuation.resume(throwing: CancellationError()) safeContinuation.resume(returning: 5) } @@ -271,10 +270,10 @@ class SafeContinuationTests: XCTestCase { func testSafeContinuationMultipleResumeThrowingErrors() async throws { do { - let _ = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let _ = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation( - continuation: continuation) + continuation: continuation + ) safeContinuation.resume(throwing: CancellationError()) safeContinuation.resume(throwing: URLError(.cancelled)) } @@ -284,9 +283,9 @@ class SafeContinuationTests: XCTestCase { } } - func testSafeContinuationMultipleResultsResumeReturningValues() async { - let value = await withUnsafeContinuation { - (continuation: UnsafeContinuation) in + func testSafeContinuationMultipleResultsResumeReturningValues() async throws + { + let value = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation(continuation: continuation) safeContinuation.resume(with: .success(5)) safeContinuation.resume(with: .success(10)) @@ -297,8 +296,7 @@ class SafeContinuationTests: XCTestCase { func testSafeContinuationMultipleResultsResumeReturningValueThrowingError() async throws { - let value = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let value = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation(continuation: continuation) safeContinuation.resume(with: .success(5)) safeContinuation.resume(with: .failure(CancellationError())) @@ -310,10 +308,10 @@ class SafeContinuationTests: XCTestCase { async throws { do { - let _ = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let _ = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation( - continuation: continuation) + continuation: continuation + ) safeContinuation.resume(with: .failure(CancellationError())) safeContinuation.resume(with: .success(5)) } @@ -326,10 +324,10 @@ class SafeContinuationTests: XCTestCase { func testSafeContinuationMultipleResultsResumeThrowingErrors() async throws { do { - let _ = try await withUnsafeThrowingContinuation { - (continuation: UnsafeContinuation) in + let _ = try await IntContinuation.with { continuation in let safeContinuation = SafeContinuation( - continuation: continuation) + continuation: continuation + ) safeContinuation.resume(with: .failure(CancellationError())) safeContinuation.resume(with: .failure(URLError(.cancelled))) }