Skip to content

Commit

Permalink
refactor: use CheckedContinuation for debug mode or for `ASYNCOBJEC…
Browse files Browse the repository at this point in the history
…TS_USE_CHECKEDCONTINUATION` flag
  • Loading branch information
soumyamahunt committed Aug 2, 2022
1 parent bbb8188 commit 3899792
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 95 deletions.
4 changes: 2 additions & 2 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Foundation
/// Wait for event signal by calling ``wait()`` method or its timeout variation ``wait(forNanoseconds:)``.
public actor AsyncEvent: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = UnsafeContinuation<Void, Error>
private typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waitig for event signal.
private var continuations: [UUID: Continuation] = [:]
/// Indicates whether current stateof event is signaled.
Expand Down Expand Up @@ -74,7 +74,7 @@ public actor AsyncEvent: AsyncObject {
public func wait() async {
guard !signaled else { return }
let key = UUID()
try? await withUnsafeThrowingContinuationCancellationHandler(
try? await withThrowingContinuationCancellationHandler(
handler: { [weak self] continuation in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
Expand Down
4 changes: 2 additions & 2 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import OrderedCollections
/// or its timeout variation ``wait(forNanoseconds:)``.
public actor AsyncSemaphore: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = UnsafeContinuation<Void, Error>
private typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waitig for access to resource.
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
/// Pool size for concurrent resource access.
Expand Down Expand Up @@ -89,7 +89,7 @@ public actor AsyncSemaphore: AsyncObject {
count -= 1
if count > 0 { return }
let key = UUID()
try? await withUnsafeThrowingContinuationCancellationHandler(
try? await withThrowingContinuationCancellationHandler(
handler: { [weak self] continuation in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
Expand Down
104 changes: 86 additions & 18 deletions Sources/AsyncObjects/ContinuationWrapper.swift
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
/// Suspends the current task, then calls the given closure with an unsafe throwing continuation for the current task.
/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked.
///
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the unsafe throwing continuation with an error
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the throwing continuation with an error
/// and the cancellation handler is always and immediately invoked after that.
/// For example, even if the operation is running code that never checks for cancellation,
/// a cancellation handler still runs and provides a chance to run some cleanup code.
///
/// - Parameters:
/// - handler: A closure that is called after cancelling continuation.
/// You must not resume the continuation in closure.
/// - fn: A closure that takes an `UnsafeContinuation` parameter.
/// - fn: A closure that takes a throwing continuation parameter.
/// You must resume the continuation exactly once.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
/// - Returns: The value passed to the continuation.
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
///
/// - Important: The continuation provided in cancellation handler is already resumed with cancellation error.
/// Trying to resume the continuation here will cause runtime error/unexpected behavior.
func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
handler: @Sendable (UnsafeContinuation<T, Error>) -> Void,
_ fn: (UnsafeContinuation<T, Error>) -> Void
) async throws -> T {
typealias Continuation = UnsafeContinuation<T, Error>
let wrapper = ContinuationWrapper<Continuation>()
func withThrowingContinuationCancellationHandler<C: ThrowingContinuable>(
handler: @Sendable (C) -> Void,
_ fn: (C) -> Void
) async throws -> C.Success where C.Failure == Error {
let wrapper = ContinuationWrapper<C>()
let value = try await withTaskCancellationHandler {
guard let continuation = wrapper.value else { return }
wrapper.cancel(withError: CancellationError())
handler(continuation)
} operation: { () -> T in
let value = try await withUnsafeThrowingContinuation {
(c: Continuation) in
wrapper.value = c
fn(c)
} operation: { () -> C.Success in
let value = try await C.with { continuation in
wrapper.value = continuation
fn(continuation)
}
return value
}
Expand Down Expand Up @@ -85,10 +83,80 @@ protocol Continuable: Sendable {
func resume(with result: Result<Success, Failure>)
}

extension UnsafeContinuation: Continuable {}

extension UnsafeContinuation where E == Error {
extension Continuable where Failure == Error {
/// Cancel continuation by resuming with cancellation error.
@inlinable
func cancel() { self.resume(throwing: CancellationError()) }
}

extension UnsafeContinuation: Continuable {}
extension CheckedContinuation: Continuable {}

protocol ThrowingContinuable: Continuable {
/// The type of error to resume the continuation with in case of failure.
associatedtype Failure = Error
/// Suspends the current task, then calls the given closure
/// with a throwing continuation for the current task.
///
/// The continuation can be resumed exactly once,
/// subsequent resumes have different behavior depending on type implemeting.
///
/// - Parameter fn: A closure that takes the throwing continuation parameter.
/// You can resume the continuation exactly once.
///
/// - Returns: The value passed to the continuation by the closure.
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
static func with(_ fn: (Self) -> Void) async throws -> Success
}

extension UnsafeContinuation: ThrowingContinuable where E == Error {
/// Suspends the current task, then calls the given closure
/// with an unsafe throwing continuation for the current task.
///
/// The continuation must be resumed exactly once, subsequent resumes will cause runtime error.
/// Use `CheckedContinuation` to capture relevant data in case of runtime errors.
///
/// - Parameter fn: A closure that takes an `UnsafeContinuation` parameter.
/// You must resume the continuation exactly once.
///
/// - Returns: The value passed to the continuation by the closure.
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
static func with(_ fn: (UnsafeContinuation<T, E>) -> Void) async throws -> T
{
return try await withUnsafeThrowingContinuation(fn)
}
}

extension CheckedContinuation: ThrowingContinuable where E == Error {
/// Suspends the current task, then calls the given closure
/// with a checked throwing continuation for the current task.
///
/// The continuation must be resumed exactly once, subsequent resumes will cause runtime error.
/// `CheckedContinuation` logs messages proving additional info on these errors.
/// Once all errors resolved, use `UnsafeContinuation` in release mode to benefit improved performance
/// at the loss of additional runtime checks.
///
/// - Parameter fn: A closure that takes a `CheckedContinuation` parameter.
/// You must resume the continuation exactly once.
///
/// - Returns: The value passed to the continuation by the closure.
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
static func with(_ body: (CheckedContinuation<T, E>) -> Void) async throws
-> T
{
return try await withCheckedThrowingContinuation(body)
}
}

#if DEBUG || ASYNCOBJECTS_USE_CHECKEDCONTINUATION
/// The continuation type used in package in `DEBUG` mode
/// or if `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag turned on.
typealias GlobalContinuation<T, E: Error> = CheckedContinuation<T, E>
#else
/// The continuation type used in package in `RELEASE` mode
///and in absence of `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag.
typealias GlobalContinuation<T, E: Error> = UnsafeContinuation<T, E>
#endif
4 changes: 2 additions & 2 deletions Sources/AsyncObjects/TaskOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,

// MARK: AsyncObject Impl
/// The suspended tasks continuation type.
private typealias Continuation = UnsafeContinuation<Void, Error>
private typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waitig for opearation completion.
private var continuations: [UUID: Continuation] = [:]

Expand Down Expand Up @@ -205,7 +205,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
public func wait() async {
guard !isFinished else { return }
let key = UUID()
try? await withUnsafeThrowingContinuationCancellationHandler(
try? await withThrowingContinuationCancellationHandler(
handler: { [weak self] (continuation: Continuation) in
Task { [weak self] in
self?.removeContinuation(withKey: key)
Expand Down
72 changes: 23 additions & 49 deletions Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public actor TaskQueue: AsyncObject {

let key = UUID()
do {
try await withOnceResumableThrowingContinuationCancellationHandler(
try await withThrowingContinuationCancellationHandler(
handler: { [weak self] continuation in
Task { [weak self] in
await self?.dequeueContinuation(withKey: key)
Expand Down Expand Up @@ -205,7 +205,7 @@ public actor TaskQueue: AsyncObject {
guard barriered || !queue.isEmpty else { return await runTask() }
let key = UUID()
do {
try await withOnceResumableThrowingContinuationCancellationHandler(
try await withThrowingContinuationCancellationHandler(
handler: { [weak self] continuation in
Task { [weak self] in
await self?.dequeueContinuation(withKey: key)
Expand Down Expand Up @@ -237,49 +237,6 @@ public actor TaskQueue: AsyncObject {
}
}

/// Suspends the current task, then calls the given closure with a safe throwing continuation for the current task.
/// Continuation is cancelled with error if current task is cancelled and cancellation handler is immediately invoked.
///
/// This operation cooperatively checks for cancellation and reacting to it by cancelling the safe throwing continuation with an error
/// and the cancellation handler is always and immediately invoked after that.
/// For example, even if the operation is running code that never checks for cancellation,
/// a cancellation handler still runs and provides a chance to run some cleanup code.
///
/// - Parameters:
/// - handler: A closure that is called after cancelling continuation.
/// Resuming the continuation in closure will not have any effect.
/// - fn: A closure that takes an `SafeContinuation` parameter.
/// Continuation can be resumed exactly once and subsequent resuming are ignored and has no effect..
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
/// - Returns: The value passed to the continuation.
///
/// - Note: The continuation provided in cancellation handler is already resumed with cancellation error.
/// Trying to resume the continuation here will have no effect.
private func withOnceResumableThrowingContinuationCancellationHandler<
T: Sendable
>(
handler: @Sendable (SafeContinuation<T, Error>) -> Void,
_ fn: (SafeContinuation<T, Error>) -> Void
) async throws -> T {
typealias Continuation = SafeContinuation<T, Error>
let wrapper = ContinuationWrapper<Continuation>()
let value = try await withTaskCancellationHandler {
guard let continuation = wrapper.value else { return }
wrapper.cancel(withError: CancellationError())
handler(continuation)
} operation: { () -> T in
let value = try await withUnsafeThrowingContinuation {
(c: UnsafeContinuation<T, Error>) in
let continuation = SafeContinuation(continuation: c)
wrapper.value = continuation
fn(continuation)
}
return value
}
return value
}

/// A mechanism to interface between synchronous and asynchronous code,
/// ignoring correctness violations.
///
Expand All @@ -302,7 +259,7 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
}

/// The underlying continuation used.
private let wrappedValue: UnsafeContinuation<T, E>
private let wrappedValue: GlobalContinuation<T, E>
/// Keeps track whether continuation can be resumed,
/// to make sure continuation only resumes once.
private let resumable: Flag = {
Expand All @@ -318,9 +275,7 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
/// don’t use it outside of this object.
///
/// - Returns: The newly created safe continuation.
init(
continuation: UnsafeContinuation<T, E>
) {
init(continuation: GlobalContinuation<T, E>) {
self.wrappedValue = continuation
}

Expand Down Expand Up @@ -381,3 +336,22 @@ struct SafeContinuation<T: Sendable, E: Error>: Continuable {
resumable.off()
}
}

extension SafeContinuation: ThrowingContinuable where E == Error {
/// Suspends the current task, then calls the given closure
/// with a safe throwing continuation for the current task.
///
/// The continuation can be resumed exactly once, subsequent resumes are ignored.
///
/// - Parameter fn: A closure that takes the throwing continuation parameter.
/// You can resume the continuation exactly once.
///
/// - Returns: The value passed to the continuation by the closure.
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
static func with(_ fn: (SafeContinuation<T, E>) -> Void) async throws -> T {
return try await GlobalContinuation<T, E>.with { continuation in
fn(.init(continuation: continuation))
}
}
}
Loading

0 comments on commit 3899792

Please sign in to comment.