From e3dcfeb8487b3d85fd7d68c7a56a382082125919 Mon Sep 17 00:00:00 2001 From: Soumya Ranjan Mahunt Date: Wed, 31 Aug 2022 16:21:10 +0530 Subject: [PATCH] feat(`TaskOperation`): allow executing as detached task (#7) --- AsyncObjects.xcodeproj/project.pbxproj | 16 +-- .../AsyncObjects/AsyncCountdownEvent.swift | 15 +- Sources/AsyncObjects/TaskOperation.swift | 128 ++++++++++++++---- Sources/AsyncObjects/TaskQueue.swift | 42 +++--- .../StandardLibraryTests.swift | 1 - .../TaskOperationTests.swift | 2 +- Tests/AsyncObjectsTests/XCTestCase.swift | 56 ++++---- 7 files changed, 171 insertions(+), 89 deletions(-) diff --git a/AsyncObjects.xcodeproj/project.pbxproj b/AsyncObjects.xcodeproj/project.pbxproj index 88f3788b..cdceab42 100644 --- a/AsyncObjects.xcodeproj/project.pbxproj +++ b/AsyncObjects.xcodeproj/project.pbxproj @@ -21,7 +21,7 @@ /* End PBXAggregateTarget section */ /* Begin PBXBuildFile section */ - 64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */; }; + 15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */; }; OBJ_106 /* AsyncCountdownEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AsyncCountdownEvent.swift */; }; OBJ_107 /* AsyncEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_12 /* AsyncEvent.swift */; }; OBJ_108 /* AsyncObject.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_13 /* AsyncObject.swift */; }; @@ -30,7 +30,7 @@ OBJ_111 /* Continuable.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_16 /* Continuable.swift */; }; OBJ_112 /* Future.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_17 /* Future.swift */; }; OBJ_113 /* Locker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_18 /* Locker.swift */; }; - OBJ_114 /* TaskGroup.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* TaskGroup.swift */; }; + OBJ_114 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* Task.swift */; }; OBJ_115 /* TaskOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_20 /* TaskOperation.swift */; }; OBJ_116 /* TaskQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_21 /* TaskQueue.swift */; }; OBJ_117 /* TaskTracker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_22 /* TaskTracker.swift */; }; @@ -104,7 +104,7 @@ /* End PBXBuildFile section */ /* Begin PBXFileReference section */ - 2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = ""; }; + 92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = ""; }; OBJ_11 /* AsyncCountdownEvent.swift */ = {isa = PBXFileReference; path = AsyncCountdownEvent.swift; sourceTree = ""; }; OBJ_12 /* AsyncEvent.swift */ = {isa = PBXFileReference; path = AsyncEvent.swift; sourceTree = ""; }; OBJ_13 /* AsyncObject.swift */ = {isa = PBXFileReference; path = AsyncObject.swift; sourceTree = ""; }; @@ -113,7 +113,7 @@ OBJ_16 /* Continuable.swift */ = {isa = PBXFileReference; path = Continuable.swift; sourceTree = ""; }; OBJ_17 /* Future.swift */ = {isa = PBXFileReference; path = Future.swift; sourceTree = ""; }; OBJ_18 /* Locker.swift */ = {isa = PBXFileReference; path = Locker.swift; sourceTree = ""; }; - OBJ_19 /* TaskGroup.swift */ = {isa = PBXFileReference; path = TaskGroup.swift; sourceTree = ""; }; + OBJ_19 /* Task.swift */ = {isa = PBXFileReference; path = Task.swift; sourceTree = ""; }; OBJ_20 /* TaskOperation.swift */ = {isa = PBXFileReference; path = TaskOperation.swift; sourceTree = ""; }; OBJ_21 /* TaskQueue.swift */ = {isa = PBXFileReference; path = TaskQueue.swift; sourceTree = ""; }; OBJ_22 /* TaskTracker.swift */ = {isa = PBXFileReference; path = TaskTracker.swift; sourceTree = ""; }; @@ -220,11 +220,11 @@ OBJ_16 /* Continuable.swift */, OBJ_17 /* Future.swift */, OBJ_18 /* Locker.swift */, - OBJ_19 /* TaskGroup.swift */, + OBJ_19 /* Task.swift */, OBJ_20 /* TaskOperation.swift */, OBJ_21 /* TaskQueue.swift */, OBJ_22 /* TaskTracker.swift */, - 2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */, + 92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */, ); name = AsyncObjects; path = Sources/AsyncObjects; @@ -556,11 +556,11 @@ OBJ_111 /* Continuable.swift in Sources */, OBJ_112 /* Future.swift in Sources */, OBJ_113 /* Locker.swift in Sources */, - OBJ_114 /* TaskGroup.swift in Sources */, + OBJ_114 /* Task.swift in Sources */, OBJ_115 /* TaskOperation.swift in Sources */, OBJ_116 /* TaskQueue.swift in Sources */, OBJ_117 /* TaskTracker.swift in Sources */, - 64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */, + 15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */, ); }; OBJ_126 /* Sources */ = { diff --git a/Sources/AsyncObjects/AsyncCountdownEvent.swift b/Sources/AsyncObjects/AsyncCountdownEvent.swift index 2c799655..cf0294bf 100644 --- a/Sources/AsyncObjects/AsyncCountdownEvent.swift +++ b/Sources/AsyncObjects/AsyncCountdownEvent.swift @@ -47,6 +47,15 @@ public actor AsyncCountdownEvent: AsyncObject { // MARK: Internal + /// Resume provided continuation with additional changes based on the associated flags. + /// + /// - Parameter continuation: The queued continuation to resume. + @inlinable + func _resumeContinuation(_ continuation: Continuation) { + currentCount += 1 + continuation.resume() + } + /// Add continuation with the provided key in `continuations` map. /// /// - Parameters: @@ -58,8 +67,7 @@ public actor AsyncCountdownEvent: AsyncObject { withKey key: UUID ) { guard !isSet, continuations.isEmpty else { - currentCount += 1 - continuation.resume() + _resumeContinuation(continuation) return } continuations[key] = continuation @@ -90,8 +98,7 @@ public actor AsyncCountdownEvent: AsyncObject { func _resumeContinuations() { while !continuations.isEmpty && isSet { let (_, continuation) = continuations.removeFirst() - continuation.resume() - self.currentCount += 1 + _resumeContinuation(continuation) } } diff --git a/Sources/AsyncObjects/TaskOperation.swift b/Sources/AsyncObjects/TaskOperation.swift index 2d24c40d..0df6b048 100644 --- a/Sources/AsyncObjects/TaskOperation.swift +++ b/Sources/AsyncObjects/TaskOperation.swift @@ -18,8 +18,6 @@ import Dispatch public final class TaskOperation: Operation, AsyncObject, @unchecked Sendable { - /// The type used to track completion of provided operation and unstructured tasks created in it. - private typealias Tracker = TaskTracker /// The asynchronous action to perform as part of the operation.. private let underlyingAction: @Sendable () async throws -> R /// The top-level task that executes asynchronous action provided @@ -29,18 +27,23 @@ public final class TaskOperation: Operation, AsyncObject, /// synchronize data access and modifications. @usableFromInline let locker: Locker + + /// A type representing a set of behaviors for the executed + /// task type and task completion behavior. + /// + /// ``TaskOperation`` determines the execution behavior of + /// provided action as task based on the provided flags. + public typealias Flags = TaskOperationFlags /// The priority of top-level task executed. /// /// In case of `nil` priority from `Task.currentPriority` /// of task that starts the operation used. public let priority: TaskPriority? - /// If completion of unstructured tasks created as part of provided task - /// should be tracked. + /// A set of behaviors for the executed task type and task completion behavior. /// - /// If true, operation only completes if the provided asynchronous action - /// and all of its created unstructured task completes. - /// Otherwise, operation completes if the provided action itself completes. - public let shouldTrackUnstructuredTasks: Bool + /// Provided flags determine the execution behavior of + /// the action as task. + public let flags: Flags /// A Boolean value indicating whether the operation executes its task asynchronously. /// @@ -119,14 +122,14 @@ public final class TaskOperation: Operation, AsyncObject, /// /// - Returns: The newly created asynchronous operation. public init( - trackUnstructuredTasks shouldTrackUnstructuredTasks: Bool = false, synchronizedWith locker: Locker = .init(), priority: TaskPriority? = nil, + flags: Flags = [], operation: @escaping @Sendable () async throws -> R ) { - self.shouldTrackUnstructuredTasks = shouldTrackUnstructuredTasks self.locker = locker self.priority = priority + self.flags = flags self.underlyingAction = operation super.init() } @@ -154,22 +157,12 @@ public final class TaskOperation: Operation, AsyncObject, /// as part of a new top-level task on behalf of the current actor. public override func main() { guard isExecuting, execTask == nil else { return } - execTask = Task(priority: priority) { [weak self] in - guard - let action = self?.underlyingAction, - let trackUnstructuredTasks = self?.shouldTrackUnstructuredTasks - else { throw CancellationError() } - let final = { @Sendable[weak self] in self?._finish(); return } - return trackUnstructuredTasks - ? try await Tracker.$current.withValue( - .init(onComplete: final), - operation: action - ) - : try await { - defer { final() } - return try await action() - }() - } + let final = { @Sendable[weak self] in self?._finish(); return } + execTask = flags.createTask( + priority: priority, + operation: underlyingAction, + onComplete: final + ) } /// Advises the operation object that it should stop executing its task. @@ -275,3 +268,86 @@ public final class TaskOperation: Operation, AsyncObject, /// if the operation hasn't been started yet with either /// ``TaskOperation/start()`` or ``TaskOperation/signal()``. public struct EarlyInvokeError: Error, Sendable {} + +/// A set of behaviors for ``TaskOperation``s, +/// such as the task type and task completion behavior. +/// +/// ``TaskOperation`` determines the execution behavior of +/// provided action as task based on the provided flags. +public struct TaskOperationFlags: OptionSet, Sendable { + /// Indicates to ``TaskOperation``, completion of unstructured tasks + /// created as part of provided operation should be tracked. + /// + /// If provided, GCD operation only completes if the provided asynchronous action + /// and all of its created unstructured task completes. + /// Otherwise, operation completes if the provided action itself completes. + public static let trackUnstructuredTasks = Self.init(rawValue: 1 << 0) + /// Indicates to ``TaskOperation`` to disassociate action from the current execution context + /// by running as a new detached task. + /// + /// Provided action is executed asynchronously as part of a new top-level task, + /// with the provided task priority and without inheriting actor context that started + /// the GCD operation. + public static let detached = Self.init(rawValue: 1 << 1) + + /// The type used to track completion of provided operation and unstructured tasks created in it. + private typealias Tracker = TaskTracker + + /// Runs the given throwing operation asynchronously as part of a new top-level task + /// based on the current flags indicating whether to on behalf of the current actor + /// and whether to track unstructured tasks created in provided operation. + /// + /// - Parameters: + /// - priority: The priority of the task that operation executes. + /// Pass `nil` to use the priority from `Task.currentPriority` + /// of task that starts the operation. + /// - operation: The asynchronous operation to execute. + /// - completion: The action to invoke when task completes. + /// + /// - Returns: A reference to the task. + fileprivate func createTask( + priority: TaskPriority? = nil, + operation: @escaping @Sendable () async throws -> R, + onComplete completion: @escaping @Sendable () -> Void + ) -> Task { + typealias LocalTask = Task + typealias ThrowingAction = @Sendable () async throws -> R + typealias TaskInitializer = (TaskPriority?, ThrowingAction) -> LocalTask + + let initializer = + self.contains(.detached) + ? LocalTask.detached + : LocalTask.init + return initializer(priority) { + return self.contains(.trackUnstructuredTasks) + ? try await Tracker.$current.withValue( + .init(onComplete: completion), + operation: operation + ) + : try await { + defer { completion() } + return try await operation() + }() + } + } + + /// The corresponding value of the raw type. + /// + /// A new instance initialized with rawValue will be equivalent to this instance. + /// For example: + /// ```swift + /// print(TaskOperationFlags(rawValue: 1 << 1) == TaskOperationFlags.detached) + /// // Prints "true" + /// ``` + public let rawValue: UInt8 + /// Creates a new flag from the given raw value. + /// + /// - Parameter rawValue: The raw value of the flag set to create. + /// - Returns: The newly created flag set. + /// + /// - Note: Do not use this method to create flag, + /// use the default flags provided instead. + public init(rawValue: UInt8) { + self.rawValue = rawValue + } +} diff --git a/Sources/AsyncObjects/TaskQueue.swift b/Sources/AsyncObjects/TaskQueue.swift index 8d3f9587..6d1230fa 100644 --- a/Sources/AsyncObjects/TaskQueue.swift +++ b/Sources/AsyncObjects/TaskQueue.swift @@ -162,24 +162,37 @@ public actor TaskQueue: AsyncObject { || flags.wait(forCurrent: currentRunning) } + /// Resume provided continuation with additional changes based on the associated flags. + /// + /// - Parameter continuation: The queued continuation to resume. + /// - Returns: Whether queue is free to proceed scheduling other tasks. + @inlinable + @discardableResult + func _resumeQueuedContinuation( + _ continuation: QueuedContinuation + ) -> Bool { + currentRunning += 1 + continuation.value.resume() + guard continuation.flags.isBlockEnabled else { return true } + blocked = true + return false + } + /// Add continuation with the provided key and associated flags to queue. /// /// - Parameters: - /// - flags: The flags associated with continuation operation. /// - key: The key in the continuation queue. - /// - continuation: The continuation to add to queue. + /// - continuation: The continuation and flags to add to queue. @inlinable func _queueContinuation( - withFlags flags: Flags = [], atKey key: UUID = .init(), - _ continuation: Continuation + _ continuation: QueuedContinuation ) { - guard _wait(whenFlags: flags) else { - currentRunning += 1 - continuation.resume() + guard _wait(whenFlags: continuation.flags) else { + _resumeQueuedContinuation(continuation) return } - queue[key] = (value: continuation, flags: flags) + queue[key] = continuation } /// Remove continuation associated with provided key from queue. @@ -218,16 +231,12 @@ public actor TaskQueue: AsyncObject { /// and operation flags preconditions satisfied. @inlinable func _resumeQueuedTasks() { - while let (_, (continuation, flags)) = queue.elements.first, + while let (_, continuation) = queue.elements.first, !blocked, - !flags.wait(forCurrent: currentRunning) + !continuation.flags.wait(forCurrent: currentRunning) { queue.removeFirst() - currentRunning += 1 - continuation.resume() - guard flags.isBlockEnabled else { continue } - blocked = true - break + guard _resumeQueuedContinuation(continuation) else { break } } } @@ -252,9 +261,8 @@ public actor TaskQueue: AsyncObject { try await Continuation.with { continuation in Task { [weak self] in await self?._queueContinuation( - withFlags: flags, atKey: key, - continuation + (value: continuation, flags: flags) ) } } diff --git a/Tests/AsyncObjectsTests/StandardLibraryTests.swift b/Tests/AsyncObjectsTests/StandardLibraryTests.swift index ed19ebf6..ac657f23 100644 --- a/Tests/AsyncObjectsTests/StandardLibraryTests.swift +++ b/Tests/AsyncObjectsTests/StandardLibraryTests.swift @@ -1,7 +1,6 @@ import XCTest /// Tests inner workings of structured concurrency -@MainActor class StandardLibraryTests: XCTestCase { func testTaskValueFetchingCancelation() async throws { diff --git a/Tests/AsyncObjectsTests/TaskOperationTests.swift b/Tests/AsyncObjectsTests/TaskOperationTests.swift index e5d3ab70..76b02b63 100644 --- a/Tests/AsyncObjectsTests/TaskOperationTests.swift +++ b/Tests/AsyncObjectsTests/TaskOperationTests.swift @@ -184,7 +184,7 @@ class TaskOperationTests: XCTestCase { func createOperationWithChildTasks( track: Bool = false ) -> TaskOperation { - return TaskOperation(trackUnstructuredTasks: track) { + return TaskOperation(flags: track ? .trackUnstructuredTasks : []) { Task { try await Self.sleep(seconds: 1) } diff --git a/Tests/AsyncObjectsTests/XCTestCase.swift b/Tests/AsyncObjectsTests/XCTestCase.swift index 67ae3c7d..467e004f 100644 --- a/Tests/AsyncObjectsTests/XCTestCase.swift +++ b/Tests/AsyncObjectsTests/XCTestCase.swift @@ -4,6 +4,26 @@ import Dispatch @MainActor extension XCTestCase { + private static var activitySupported = ProcessInfo.processInfo.environment + .keys.contains("__XCODE_BUILT_PRODUCTS_DIR_PATHS") + + private static func runAssertions( + with name: String?, + _ assertions: () -> Void + ) { + #if canImport(Darwin) + if let name = name, activitySupported { + XCTContext.runActivity(named: name) { activity in + assertions() + } + } else { + assertions() + } + #else + assertions() + #endif + } + static func checkExecInterval( name: String? = nil, durationInSeconds seconds: Int = 0, @@ -22,14 +42,7 @@ extension XCTestCase { ) ) } - - if let name = name { - XCTContext.runActivity(named: name) { activity in - assertions() - } - } else { - assertions() - } + runAssertions(with: name, assertions) } static func checkExecInterval( @@ -51,14 +64,7 @@ extension XCTestCase { (duration / 1E9).rounded() / order ) } - - if let name = name { - XCTContext.runActivity(named: name) { activity in - assertions() - } - } else { - assertions() - } + runAssertions(with: name, assertions) } static func checkExecInterval( @@ -79,14 +85,7 @@ extension XCTestCase { "\(duration) not present in \(range)" ) } - - if let name = name { - XCTContext.runActivity(named: name) { activity in - assertions() - } - } else { - assertions() - } + runAssertions(with: name, assertions) } static func checkExecInterval( @@ -106,14 +105,7 @@ extension XCTestCase { "\(duration) not present in \(range)" ) } - - if let name = name { - XCTContext.runActivity(named: name) { activity in - assertions() - } - } else { - assertions() - } + runAssertions(with: name, assertions) } static func sleep(seconds: UInt64) async throws {