Skip to content

Commit

Permalink
feat: add barrier and block flags for TaskQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 15, 2022
1 parent 8e07add commit d3e566a
Show file tree
Hide file tree
Showing 21 changed files with 1,313 additions and 990 deletions.
848 changes: 426 additions & 422 deletions AsyncObjects.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

30 changes: 16 additions & 14 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ import OrderedCollections
/// only one low priority usage allowed at one time.
public actor AsyncCountdownEvent: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = GlobalContinuation<Void, Error>
@usableFromInline
typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
/// The lower limit for the countdown event to trigger.
@usableFromInline
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
/// The limit up to which the countdown counts and triggers event.
///
/// By default this is set to zero and can be changed during initialization.
public let limit: UInt
/// Current count of the countdown.
///
/// If the current count becomes less or equal to limit, queued tasks
/// are resumed from suspension until current count exceeds limit.
public private(set) var currentCount: UInt
public var currentCount: UInt
/// Initial count of the countdown when count started.
///
/// Can be changed after initialization
Expand All @@ -44,8 +46,8 @@ public actor AsyncCountdownEvent: AsyncObject {
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inline(__always)
private func addContinuation(
@inlinable
func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -56,24 +58,24 @@ public actor AsyncCountdownEvent: AsyncObject {
/// from `continuations` map and resumes with `CancellationError`.
///
/// - Parameter key: The key in the map.
@inline(__always)
private func removeContinuation(withKey key: UUID) {
@inlinable
func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}

/// Decrements countdown count by the provided number.
///
/// - Parameter number: The number to decrement count by.
@inline(__always)
private func decrementCount(by number: UInt = 1) {
@inlinable
func decrementCount(by number: UInt = 1) {
guard currentCount > 0 else { return }
currentCount -= number
}

/// Resume previously waiting continuations for countdown event.
@inline(__always)
private func resumeContinuations() {
@inlinable
func resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
Expand All @@ -89,8 +91,8 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inline(__always)
private func withPromisedContinuation() async throws {
@inlinable
func withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
Expand Down
18 changes: 10 additions & 8 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import Foundation
/// Wait for event signal by calling ``wait()`` method or its timeout variation ``wait(forNanoseconds:)``.
public actor AsyncEvent: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = GlobalContinuation<Void, Error>
@usableFromInline
typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waiting for event signal.
private var continuations: [UUID: Continuation] = [:]
@usableFromInline
private(set) var continuations: [UUID: Continuation] = [:]
/// Indicates whether current state of event is signalled.
private var signalled: Bool

Expand All @@ -19,8 +21,8 @@ public actor AsyncEvent: AsyncObject {
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inline(__always)
private func addContinuation(
@inlinable
func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -31,8 +33,8 @@ public actor AsyncEvent: AsyncObject {
/// from `continuations` map and resumes with `CancellationError`.
///
/// - Parameter key: The key in the map.
@inline(__always)
private func removeContinuation(withKey key: UUID) {
@inlinable
func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}
Expand All @@ -45,8 +47,8 @@ public actor AsyncEvent: AsyncObject {
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inline(__always)
private func withPromisedContinuation() async throws {
@inlinable
func withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
Expand Down
28 changes: 16 additions & 12 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@ import OrderedCollections
/// or its timeout variation ``wait(forNanoseconds:)``.
public actor AsyncSemaphore: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = GlobalContinuation<Void, Error>
@usableFromInline
typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waiting for access to resource.
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
@usableFromInline
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
/// Pool size for concurrent resource access.
/// Has value provided during initialization incremented by one.
private var limit: UInt
@usableFromInline
private(set) var limit: UInt
/// Current count of semaphore.
/// Can have maximum value up to `limit`.
private var count: Int
@usableFromInline
private(set) var count: Int

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inline(__always)
private func addContinuation(
@inlinable
func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -38,16 +42,16 @@ public actor AsyncSemaphore: AsyncObject {
/// from `continuations` map and resumes with `CancellationError`.
///
/// - Parameter key: The key in the map.
@inline(__always)
private func removeContinuation(withKey key: UUID) {
@inlinable
func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
incrementCount()
}

/// Increments semaphore count within limit provided.
@inline(__always)
private func incrementCount() {
@inlinable
func incrementCount() {
guard count < limit else { return }
count += 1
}
Expand All @@ -60,8 +64,8 @@ public actor AsyncSemaphore: AsyncObject {
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inline(__always)
private func withPromisedContinuation() async throws {
@inlinable
func withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
Expand Down
18 changes: 10 additions & 8 deletions Sources/AsyncObjects/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,39 @@ import Foundation
/// In case of circular dependency between cancellation sources, app will go into infinite recursion.
public actor CancellationSource {
/// All the registered tasks for cooperative cancellation.
private var registeredTasks: [AnyHashable: () -> Void] = [:]
@usableFromInline
private(set) var registeredTasks: [AnyHashable: () -> Void] = [:]
/// All the linked cancellation sources that cancellation event will be propagated.
///
/// - TODO: Store weak reference for cancellation sources.
/// ```swift
/// private var linkedSources: NSHashTable<CancellationSource> = .weakObjects()
/// ```
private var linkedSources: [CancellationSource] = []
@usableFromInline
private(set) var linkedSources: [CancellationSource] = []

/// Add task to registered cooperative cancellation tasks list.
///
/// - Parameter task: The task to register.
@inline(__always)
private func add<Success, Failure>(task: Task<Success, Failure>) {
@inlinable
func add<Success, Failure>(task: Task<Success, Failure>) {
guard !task.isCancelled else { return }
registeredTasks[task] = { task.cancel() }
}

/// Remove task from registered cooperative cancellation tasks list.
///
/// - Parameter task: The task to remove.
@inline(__always)
private func remove<Success, Failure>(task: Task<Success, Failure>) {
@inlinable
func remove<Success, Failure>(task: Task<Success, Failure>) {
registeredTasks.removeValue(forKey: task)
}

/// Add cancellation source to linked cancellation sources list to propagate cancellation event.
///
/// - Parameter task: The source to link.
@inline(__always)
private func addSource(_ source: CancellationSource) {
@inlinable
func addSource(_ source: CancellationSource) {
linkedSources.append(source)
}

Expand Down
5 changes: 5 additions & 0 deletions Sources/AsyncObjects/Continuable.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// A type that allows to interface between synchronous and asynchronous code,
/// by representing task state and allowing task resuming with some value or error.
@usableFromInline
protocol Continuable: Sendable {
/// The type of value to resume the continuation with in case of success.
associatedtype Success
Expand All @@ -26,6 +27,7 @@ extension Continuable where Failure == Error {
func cancel() { self.resume(throwing: CancellationError()) }
}

@usableFromInline
protocol ThrowingContinuable: Continuable {
/// The type of error to resume the continuation with in case of failure.
associatedtype Failure = Error
Expand All @@ -44,6 +46,7 @@ protocol ThrowingContinuable: Continuable {
static func with(_ fn: (Self) -> Void) async throws -> Success
}

@usableFromInline
protocol NonThrowingContinuable: Continuable {
/// The type of error to resume the continuation with in case of failure.
associatedtype Failure = Never
Expand All @@ -64,6 +67,7 @@ protocol NonThrowingContinuable: Continuable {
#if DEBUG || ASYNCOBJECTS_USE_CHECKEDCONTINUATION
/// The continuation type used in package in `DEBUG` mode
/// or if `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag turned on.
@usableFromInline
typealias GlobalContinuation<T, E: Error> = CheckedContinuation<T, E>

extension CheckedContinuation: Continuable {}
Expand Down Expand Up @@ -111,6 +115,7 @@ extension CheckedContinuation: NonThrowingContinuable where E == Never {
#else
/// The continuation type used in package in `RELEASE` mode
///and in absence of `ASYNCOBJECTS_USE_CHECKEDCONTINUATION` flag.
@usableFromInline
typealias GlobalContinuation<T, E: Error> = UnsafeContinuation<T, E>

extension UnsafeContinuation: Continuable {}
Expand Down
18 changes: 10 additions & 8 deletions Sources/AsyncObjects/Future.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ public actor Future<Output, Failure: Error> {
/// A type that represents the result in the future, when an element or error is available.
public typealias FutureResult = Result<Output, Failure>
/// The suspended tasks continuation type.
private typealias Continuation = GlobalContinuation<Output, Failure>
@usableFromInline
typealias Continuation = GlobalContinuation<Output, Failure>
/// The continuations stored with an associated key for all the suspended task
/// that are waiting for future to be fulfilled.
private var continuations: [UUID: Continuation] = [:]
@usableFromInline
private(set) var continuations: [UUID: Continuation] = [:]
/// The underlying `Result` that indicates either future fulfilled or rejected.
///
/// If future isn't fulfilled or rejected, the value is `nil`.
Expand All @@ -33,8 +35,8 @@ public actor Future<Output, Failure: Error> {
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inline(__always)
private func addContinuation(
@inlinable
func addContinuation(
_ continuation: Continuation,
withKey key: UUID = .init()
) {
Expand Down Expand Up @@ -297,8 +299,8 @@ extension Future where Failure == Error {
/// from `continuations` map and resumes with `CancellationError`.
///
/// - Parameter key: The key in the map.
@inline(__always)
private func removeContinuation(withKey key: UUID) {
@inlinable
func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}
Expand All @@ -313,8 +315,8 @@ extension Future where Failure == Error {
/// - Returns: The value continuation is resumed with.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inline(__always)
private func withPromisedContinuation() async throws -> Output {
@inlinable
func withPromisedContinuation() async throws -> Output {
let key = UUID()
let value = try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
Expand Down
49 changes: 49 additions & 0 deletions Sources/AsyncObjects/TaskGroup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
public extension TaskGroup {
/// Adds a child task to the group and starts the task.
///
/// This method adds child task to the group and returns only after the child task is started.
///
/// - Parameters:
/// - priority: The priority of the operation task. Omit this parameter or
/// pass `nil` to set the child task’s priority to the priority of the group.
/// - operation: The operation to execute as part of the task group.
@inlinable
mutating func addTaskAndStart(
priority: TaskPriority? = nil,
operation: @escaping @Sendable () async -> ChildTaskResult
) async {
typealias C = UnsafeContinuation<Void, Never>
await withUnsafeContinuation { (continuation: C) in
self.addTask {
continuation.resume()
return await operation()
}
}
}
}

public extension ThrowingTaskGroup {
/// Adds a child task to the group and starts the task.
///
/// This method adds child task to the group and returns only after the child task is started.
/// This method doesn’t throw an error, even if the child task does. Instead,
/// the corresponding call to `ThrowingTaskGroup.next()` rethrows that error.
///
/// - Parameters:
/// - priority: The priority of the operation task. Omit this parameter or
/// pass `nil` to set the child task’s priority to the priority of the group.
/// - operation: The operation to execute as part of the task group.
@inlinable
mutating func addTaskAndStart(
priority: TaskPriority? = nil,
operation: @escaping @Sendable () async throws -> ChildTaskResult
) async {
typealias C = UnsafeContinuation<Void, Never>
await withUnsafeContinuation { (continuation: C) in
self.addTask {
continuation.resume()
return try await operation()
}
}
}
}
Loading

0 comments on commit d3e566a

Please sign in to comment.