Skip to content

Commit

Permalink
feat: add async countdown event
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 13, 2022
1 parent aaa633d commit f138abc
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 59 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
"include": [
{
"os": "macos-12",
"xcode": "latest-stable",
"swift": "5.6"
},
{
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ Several synchronization primitives and task synchronization mechanisms introduce
While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:

- Easier task cancellation with ``CancellationSource``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore``, ``AsyncEvent`` and ``AsyncCountdownEvent``.
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.
- Transferring data between multiple task boundaries with ``Future``.
187 changes: 187 additions & 0 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import Foundation
import OrderedCollections

/// An event object that controls access to a resource between high and low priority tasks
/// and signals when count is within limit.
///
/// An async countdown event is an inverse of ``AsyncSemaphore``,
/// in the sense that instead of restricting access to a resource,
/// it notifies when the resource usage is idle or inefficient.
///
/// You can indicate high priority usage of resource by using ``increment(by:)`` method,
/// and indicate free of resource by calling ``signal(repeat:)`` or ``signal()`` methods.
/// For low priority resource usage or detect resource idling use ``wait()`` method
/// or its timeout variation ``wait(forNanoseconds:)``.
///
/// Use the ``limit`` parameter to indicate concurrent low priority usage, i.e. if limit set to zero,
/// only one low priority usage allowed at one time.
public actor AsyncCountdownEvent: AsyncObject {
/// The suspended tasks continuation type.
private typealias Continuation = GlobalContinuation<Void, Error>
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
private var continuations: OrderedDictionary<UUID, Continuation> = [:]
/// The lower limit for the countdown event to trigger.
///
/// By default this is set to zero and can be changed during initialization.
public let limit: UInt
/// Current count of the countdown.
///
/// If the current count becomes less or equal to limit, queued tasks
/// are resumed from suspension until current count exceeds limit.
public private(set) var currentCount: UInt
/// Initial count of the countdown when count started.
///
/// Can be changed after initialization
/// by using ``reset(to:)`` method.
public private(set) var initialCount: UInt
/// Indicates whether countdown event current count is within ``limit``.
///
/// Queued tasks are resumed from suspension when event is set and until current count exceeds limit.
public var isSet: Bool { currentCount >= 0 && currentCount <= limit }

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inline(__always)
private func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
continuations[key] = continuation
}

/// Remove continuation associated with provided key
/// from `continuations` map and resumes with `CancellationError`.
///
/// - Parameter key: The key in the map.
@inline(__always)
private func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}

/// Decrements countdown count by the provided number.
///
/// - Parameter number: The number to decrement count by.
@inline(__always)
private func decrementCount(by number: UInt = 1) {
guard currentCount > 0 else { return }
currentCount -= number
}

/// Resume previously waiting continuations for countdown event.
@inline(__always)
private func resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
self.currentCount += 1
}
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `removeContinuation`.
///
/// Spins up a new continuation and requests to track it with key by invoking `addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inline(__always)
private func withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self.addContinuation(continuation, withKey: key)
}
}
}

/// Creates new countdown event with the limit count down up to and an initial count.
/// By default, both limit and initial count are zero.
///
/// Passing zero for the limit value is useful for when one low priority access should be given
/// in absence of high priority resource usages. Passing a value greater than zero for the limit is useful
/// for managing a finite limit of access to low priority tasks, in absence of high priority resource usages.
///
/// - Parameters:
/// - limit: The value to count down up to.
/// - initial: The initial count.
///
/// - Returns: The newly created countdown event .
public init(until limit: UInt = 0, initial: UInt = 0) {
self.limit = limit
self.initialCount = initial
self.currentCount = initial
}

deinit { self.continuations.forEach { $0.value.cancel() } }

/// Increments the countdown event current count by the specified value.
///
/// Unlike the ``wait()`` method count is reflected immediately.
/// Use this to indicate usage of resource from high priority tasks.
///
/// - Parameter count: The value by which to increase ``currentCount``.
public func increment(by count: UInt = 1) {
self.currentCount += count
}

/// Resets current count to initial count.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
public func reset() {
self.currentCount = initialCount
resumeContinuations()
}

/// Resets initial count and current count to specified value.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The new initial count.
public func reset(to count: UInt) {
initialCount = count
self.currentCount = count
resumeContinuations()
}

/// Registers a signal (decrements) with the countdown event.
///
/// Decrement the countdown. If the current count becomes less or equal to limit,
/// one queued task is resumed from suspension.
public func signal() {
signal(repeat: 1)
}

/// Registers multiple signals (decrements by provided count) with the countdown event.
///
/// Decrement the countdown by the provided count. If the current count becomes less or equal to limit,
/// multiple queued tasks are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The number of signals to register.
public func signal(repeat count: UInt) {
decrementCount(by: count)
resumeContinuations()
}

/// Waits for, or increments, a countdown event.
///
/// Increment the countdown if the current count is less or equal to limit.
/// Otherwise, current task is suspended until either a signal occurs or event is reset.
///
/// Use this to wait for high priority tasks completion to start low priority ones.
@Sendable
public func wait() async {
if isSet { currentCount += 1; return }
try? await withPromisedContinuation()
}
}
4 changes: 1 addition & 3 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public actor AsyncEvent: AsyncObject {
@inline(__always)
private func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.resume(throwing: CancellationError())
continuation?.cancel()
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
Expand Down Expand Up @@ -74,15 +74,13 @@ public actor AsyncEvent: AsyncObject {
/// Resets signal of event.
///
/// After reset, tasks have to wait for event signal to complete.
@Sendable
public func reset() {
signalled = false
}

/// Signals the event.
///
/// Resumes all the tasks suspended and waiting for signal.
@Sendable
public func signal() {
continuations.forEach { $0.value.resume() }
continuations = [:]
Expand Down
11 changes: 10 additions & 1 deletion Sources/AsyncObjects/AsyncObject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public extension AsyncObject where Self: AnyObject {
///
/// - Parameter objects: The objects to wait for.
@inlinable
@Sendable
public func waitForAll(_ objects: [any AsyncObject]) async {
await withTaskGroup(of: Void.self) { group in
objects.forEach { group.addTask(operation: $0.wait) }
Expand All @@ -87,6 +88,7 @@ public func waitForAll(_ objects: [any AsyncObject]) async {
///
/// - Parameter objects: The objects to wait for.
@inlinable
@Sendable
public func waitForAll(_ objects: any AsyncObject...) async {
await waitForAll(objects)
}
Expand All @@ -103,6 +105,7 @@ public func waitForAll(_ objects: any AsyncObject...) async {
/// - duration: The duration in nano seconds to wait until.
/// - Returns: The result indicating whether wait completed or timed out.
@inlinable
@Sendable
public func waitForAll(
_ objects: [any AsyncObject],
forNanoseconds duration: UInt64
Expand All @@ -124,6 +127,7 @@ public func waitForAll(
/// - duration: The duration in nano seconds to wait until.
/// - Returns: The result indicating whether wait completed or timed out.
@inlinable
@Sendable
public func waitForAll(
_ objects: any AsyncObject...,
forNanoseconds duration: UInt64
Expand All @@ -141,6 +145,7 @@ public func waitForAll(
/// - objects: The objects to wait for.
/// - count: The number of objects to wait for.
@inlinable
@Sendable
public func waitForAny(_ objects: [any AsyncObject], count: Int = 1) async {
await withTaskGroup(of: Void.self) { group in
objects.forEach { group.addTask(operation: $0.wait) }
Expand All @@ -159,6 +164,7 @@ public func waitForAny(_ objects: [any AsyncObject], count: Int = 1) async {
/// - objects: The objects to wait for.
/// - count: The number of objects to wait for.
@inlinable
@Sendable
public func waitForAny(_ objects: any AsyncObject..., count: Int = 1) async {
await waitForAny(objects, count: count)
}
Expand All @@ -176,6 +182,7 @@ public func waitForAny(_ objects: any AsyncObject..., count: Int = 1) async {
/// - duration: The duration in nano seconds to wait until.
/// - Returns: The result indicating whether wait completed or timed out.
@inlinable
@Sendable
public func waitForAny(
_ objects: [any AsyncObject],
count: Int = 1,
Expand All @@ -199,6 +206,7 @@ public func waitForAny(
/// - duration: The duration in nano seconds to wait until.
/// - Returns: The result indicating whether wait completed or timed out.
@inlinable
@Sendable
public func waitForAny(
_ objects: any AsyncObject...,
count: Int = 1,
Expand All @@ -217,6 +225,7 @@ public func waitForAny(
/// - timeout: The duration in nano seconds to wait until.
/// - Returns: The result indicating whether task execution completed
/// or timed out.
@Sendable
public func waitForTaskCompletion(
withTimeoutInNanoseconds timeout: UInt64,
_ task: @escaping @Sendable () async -> Void
Expand All @@ -231,7 +240,7 @@ public func waitForTaskCompletion(
}
}
group.addTask {
(try? await Task.sleep(nanoseconds: timeout)) == nil
(try? await Task.sleep(nanoseconds: timeout + 1_000)) == nil
}
if let result = await group.next() {
timedOut = !result
Expand Down
3 changes: 2 additions & 1 deletion Sources/AsyncObjects/AsyncObjects.docc/AsyncObjects.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Several synchronization primitives and task synchronization mechanisms introduce
While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:

- Easier task cancellation with ``CancellationSource``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore``, ``AsyncEvent`` and ``AsyncCountdownEvent``.
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.
- Transferring data between multiple task boundaries with ``Future``.

Expand All @@ -17,6 +17,7 @@ While Swift's modern structured concurrency provides safer way of managing concu

- ``AsyncSemaphore``
- ``AsyncEvent``
- ``AsyncCountdownEvent``

### Tasks Synchronization

Expand Down
3 changes: 1 addition & 2 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public actor AsyncSemaphore: AsyncObject {
@inline(__always)
private func removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.resume(throwing: CancellationError())
continuation?.cancel()
incrementCount()
}

Expand Down Expand Up @@ -95,7 +95,6 @@ public actor AsyncSemaphore: AsyncObject {
/// Increment the counting semaphore.
/// If the previous value was less than zero,
/// current task is resumed from suspension.
@Sendable
public func signal() {
incrementCount()
guard !continuations.isEmpty else { return }
Expand Down
1 change: 0 additions & 1 deletion Sources/AsyncObjects/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public actor CancellationSource {
/// If task completes before cancellation event is triggered, it is automatically unregistered.
///
/// - Parameter task: The task to register.
@Sendable
public func register<Success, Failure>(task: Task<Success, Failure>) {
add(task: task)
Task { [weak self] in
Expand Down
Loading

0 comments on commit f138abc

Please sign in to comment.