Skip to content

Commit

Permalink
feat: add task queue to run concurrent tasks and barrier tasks simila…
Browse files Browse the repository at this point in the history
…r to DispatchQueue
  • Loading branch information
soumyamahunt committed Aug 1, 2022
1 parent c752519 commit 84e4d29
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 33 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
[![Swift](https://img.shields.io/badge/Swift-5.6+-orange)](https://img.shields.io/badge/Swift-5-DE5D43)
[![Platforms](https://img.shields.io/badge/Platforms-all-sucess)](https://img.shields.io/badge/Platforms-all-sucess)
[![CI/CD](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/main.yml)
[![CodeQL](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml/badge.svg?event=schedule)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml)
[![Maintainability](https://api.codeclimate.com/v1/badges/37183c809818826c1bcf/maintainability)](https://codeclimate.com/github/SwiftyLab/AsyncObjects/maintainability)
[![codecov](https://codecov.io/gh/SwiftyLab/AsyncObjects/branch/main/graph/badge.svg?token=jKxMv5oFeA)](https://codecov.io/gh/SwiftyLab/AsyncObjects)
<!-- [![CocoaPods Compatible](https://img.shields.io/cocoapods/v/AsyncObjects.svg?label=CocoaPods&color=C90005)](https://badge.fury.io/co/AsyncObjects) -->
<!-- [![CodeQL](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml/badge.svg?event=schedule)](https://github.com/SwiftyLab/AsyncObjects/actions/workflows/codeql-analysis.yml) -->

Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
Several synchronization primitives and task synchronization mechanisms introduced to aid in modern swift concurrency.

## Overview

While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:

- Easier task cancellation with ``CancellationSource``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.
4 changes: 2 additions & 2 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public actor AsyncEvent: AsyncObject {
guard !signaled else { return }
let key = UUID()
try? await withUnsafeThrowingContinuationCancellationHandler(
handler: { [weak self] (continuation: Continuation) in
handler: { [weak self] continuation in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
}
},
{ [weak self] (continuation: Continuation) in
{ [weak self] continuation in
Task { [weak self] in
await self?.addContinuation(continuation, withKey: key)
}
Expand Down
11 changes: 8 additions & 3 deletions Sources/AsyncObjects/AsyncObjects.docc/AsyncObjects.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# ``AsyncObjects``

Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
Several synchronization primitives and task synchronization mechanisms introduced to aid in modern swift concurrency.

## Overview

Several synchronization primitives introduced to aid in modern swift concurrency. The primitives are very similar to those used in other operating systems including mutexes, condition variables, shared/exclusive locks, and semaphores.
While Swift's modern structured concurrency provides safer way of managing concurrency, it lacks many synchronization and task management features in its current state. **AsyncObjects** aims to close the functionality gap by providing following features:

- Easier task cancellation with ``CancellationSource``.
- Introducing traditional synchronization primitives that work in non-blocking way with ``AsyncSemaphore`` and ``AsyncEvent``.
- Bridging with Grand Central Dispatch and allowing usage of GCD specific patterns with ``TaskOperation`` and ``TaskQueue``.

## Topics

Expand All @@ -13,7 +17,8 @@ Several synchronization primitives introduced to aid in modern swift concurrency
- ``AsyncSemaphore``
- ``AsyncEvent``

### Tasks Control
### Tasks Synchronization

- ``CancellationSource``
- ``TaskOperation``
- ``TaskQueue``
4 changes: 2 additions & 2 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ public actor AsyncSemaphore: AsyncObject {
if count > 0 { return }
let key = UUID()
try? await withUnsafeThrowingContinuationCancellationHandler(
handler: { [weak self] (continuation: Continuation) in
handler: { [weak self] continuation in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
}
},
{ [weak self] (continuation: Continuation) in
{ [weak self] continuation in
Task { [weak self] in
await self?.addContinuation(continuation, withKey: key)
}
Expand Down
20 changes: 20 additions & 0 deletions Sources/AsyncObjects/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public actor CancellationSource {
}

/// Creates a new cancellation source object.
///
/// - Returns: The newly created cancellation source.
public init() { }

/// Creates a new cancellation source object linking to all the provided cancellation sources.
Expand All @@ -57,6 +59,8 @@ public actor CancellationSource {
/// will ensure newly created cancellation source recieve cancellation event.
///
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
///
/// - Returns: The newly created cancellation source.
public init(linkedWith sources: [CancellationSource]) async {
await withTaskGroup(of: Void.self) { group in
sources.forEach { source in
Expand Down Expand Up @@ -141,6 +145,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -165,6 +171,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -189,6 +197,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand All @@ -213,6 +223,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand All @@ -235,6 +247,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -254,6 +268,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -273,6 +289,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand All @@ -293,6 +311,8 @@ public extension Task {
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand Down
69 changes: 47 additions & 22 deletions Sources/AsyncObjects/ContinuationWrapper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
/// You must not resume the continuation in closure.
/// - fn: A closure that takes an `UnsafeContinuation` parameter.
/// You must resume the continuation exactly once.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
/// - Returns: The value passed to the continuation.
///
/// - Important: The continuation provided in cancellation handler is already resumed with cancellation error.
/// Trying to resume the continuation here will cause runtime error/unexpected behavior.
func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
handler: @Sendable (UnsafeContinuation<T, Error>) -> Void,
_ fn: (UnsafeContinuation<T, Error>) -> Void
) async throws -> T {
typealias Continuation = UnsafeContinuation<T, Error>
let wrapper = Continuation.Wrapper()
let wrapper = ContinuationWrapper<Continuation>()
let value = try await withTaskCancellationHandler {
guard let continuation = wrapper.value else { return }
wrapper.cancel(withError: CancellationError())
Expand All @@ -36,32 +38,55 @@ func withUnsafeThrowingContinuationCancellationHandler<T: Sendable>(
return value
}

extension UnsafeContinuation {
/// Wrapper type used to store `continuation` and
/// provide cancellation mechanism.
class Wrapper {
/// The underlying continuation referenced.
var value: UnsafeContinuation?
/// Wrapper type used to store `continuation` and
/// provide cancellation mechanism.
final class ContinuationWrapper<Wrapped: Continuable> {
/// The underlying continuation referenced.
var value: Wrapped?

/// Creates a new instance with a continuation reference passed.
/// By default no continuation is stored.
///
/// - Parameter value: A continuation reference to store.
/// - Returns: The newly created continuation wrapper.
init(value: UnsafeContinuation? = nil) {
self.value = value
}
/// Creates a new instance with a continuation reference passed.
/// By default no continuation is stored.
///
/// - Parameter value: A continuation reference to store.
///
/// - Returns: The newly created continuation wrapper.
init(value: Wrapped? = nil) {
self.value = value
}

/// Resume continuation with passed error,
/// without checking if continuation already resumed.
///
/// - Parameter error: Error passed to continuation.
func cancel(withError error: E) {
value?.resume(throwing: error)
}
/// Resume continuation with passed error,
/// without checking if continuation already resumed.
///
/// - Parameter error: Error passed to continuation.
func cancel(withError error: Wrapped.Failure) {
value?.resume(throwing: error)
}
}

/// A type that allows to interface between synchronous and asynchronous code,
/// by representing task state and allowing task resuming with some value or error.
protocol Continuable: Sendable {
/// The type of value to resume the continuation with in case of success.
associatedtype Success
/// The type of error to resume the continuation with in case of failure.
associatedtype Failure: Error
/// Resume the task awaiting the continuation by having it return normally from its suspension point.
///
/// - Parameter value: The value to return from the continuation.
func resume(returning value: Success)
/// Resume the task awaiting the continuation by having it throw an error from its suspension point.
///
/// - Parameter error: The error to throw from the continuation.
func resume(throwing error: Failure)
/// Resume the task awaiting the continuation by having it either return normally
/// or throw an error based on the state of the given `Result` value.
///
/// - Parameter result: A value to either return or throw from the continuation.
func resume(with result: Result<Success, Failure>)
}

extension UnsafeContinuation: Continuable {}

extension UnsafeContinuation where E == Error {
/// Cancel continuation by resuming with cancellation error.
@inlinable
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncObjects/TaskOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
@unchecked Sendable
{
/// The dispatch queue used to synchronize data access and modifications.
private weak var propQueue: DispatchQueue!
private unowned let propQueue: DispatchQueue
/// The asynchronous action to perform as part of the operation..
private let underlyingAction: @Sendable () async throws -> R
/// The top-level task that executes asynchronous action provided
Expand Down
Loading

0 comments on commit 84e4d29

Please sign in to comment.