diff --git a/Sources/AsyncObjects/CancellationSource/Cancellable.swift b/Sources/AsyncObjects/CancellationSource/Cancellable.swift index 38231584..f93d3781 100644 --- a/Sources/AsyncObjects/CancellationSource/Cancellable.swift +++ b/Sources/AsyncObjects/CancellationSource/Cancellable.swift @@ -1,3 +1,5 @@ +import Foundation + /// A type representing a unit of work or task that supports cancellation. /// /// Cancellation should be initiated on invoking ``cancel(file:function:line:)`` @@ -92,3 +94,39 @@ extension Task: Cancellable { let _ = try await self.value } } + +/// Waits asynchronously for the work or task to complete +/// handling cooperative cancellation initiation. +/// +/// - Parameters: +/// - work: The work for which completion to wait and handle cooperative cancellation. +/// - id: The identifier associated with work. +/// - file: The file wait request originates from (there's usually no need to pass it +/// explicitly as it defaults to `#fileID`). +/// - function: The function wait request originates from (there's usually no need to +/// pass it explicitly as it defaults to `#function`). +/// - line: The line wait request originates from (there's usually no need to pass it +/// explicitly as it defaults to `#line`). +/// +/// - Throws: If waiting for the work completes with an error. +@inlinable +func waitHandlingCancelation( + for work: Cancellable, + associatedId id: UUID, + file: String = #fileID, + function: String = #function, + line: UInt = #line +) async throws { + try await withTaskCancellationHandler { + defer { + log("Finished", id: id, file: file, function: function, line: line) + } + try await work.wait(file: file, function: function, line: line) + } onCancel: { + work.cancel(file: file, function: function, line: line) + log( + "Cancellation initiated", id: id, + file: file, function: function, line: line + ) + } +} diff --git a/Sources/AsyncObjects/CancellationSource/CancellationSource.swift b/Sources/AsyncObjects/CancellationSource/CancellationSource.swift index d77ac5a5..b5382961 100644 --- a/Sources/AsyncObjects/CancellationSource/CancellationSource.swift +++ b/Sources/AsyncObjects/CancellationSource/CancellationSource.swift @@ -33,12 +33,10 @@ import Foundation /// tasks in that case. public struct CancellationSource: AsyncObject, Cancellable, Loggable { /// The continuation type controlling task group lifetime. - @usableFromInline internal typealias Continuation = GlobalContinuation /// The cancellable work with invocation context. - @usableFromInline internal typealias WorkItem = ( - Cancellable, file: String, function: String, line: UInt + Cancellable, id: UUID, file: String, function: String, line: UInt ) /// The lifetime task that is cancelled when @@ -47,7 +45,6 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable { var lifetime: Task! /// The stream continuation used to register work items /// for cooperative cancellation. - @usableFromInline var pipe: AsyncStream.Continuation! /// A Boolean value that indicates whether cancellation is already @@ -57,6 +54,7 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable { /// There is no way to uncancel on this source. Create a new /// `CancellationSource` to manage cancellation of newly spawned /// tasks in that case. + @inlinable public var isCancelled: Bool { lifetime.isCancelled } /// Creates a new cancellation source object. @@ -64,23 +62,16 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable { /// - Returns: The newly created cancellation source. public init() { let stream = AsyncStream { self.pipe = $0 } - self.lifetime = Task { + self.lifetime = Task.detached { try await withThrowingTaskGroup(of: Void.self) { group in for await item in stream { group.addTask { - try? await withTaskCancellationHandler { - try await item.0.wait( - file: item.file, - function: item.function, - line: item.line - ) - } onCancel: { - item.0.cancel( - file: item.file, - function: item.function, - line: item.line - ) - } + try? await waitHandlingCancelation( + for: item.0, associatedId: item.id, + file: item.file, + function: item.function, + line: item.line + ) } } @@ -110,18 +101,19 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable { function: String = #function, line: UInt = #line ) { - let result = pipe.yield((task, file, function, line)) + let id = UUID() + let result = pipe.yield((task, id, file, function, line)) switch result { case .enqueued: log( - "Registered \(task)", + "Registered \(task)", id: id, file: file, function: function, line: line ) case .dropped, .terminated: fallthrough @unknown default: task.cancel(file: file, function: function, line: line) log( - "Cancelled \(task) due to result: \(result)", + "Cancelled \(task) due to result: \(result)", id: id, file: file, function: function, line: line ) } @@ -172,10 +164,10 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable { function: String = #function, line: UInt = #line ) async { - let key = UUID() - log("Waiting", id: key, file: file, function: function, line: line) + let id = UUID() + log("Waiting", id: id, file: file, function: function, line: line) let _ = await lifetime.result - log("Completed", id: key, file: file, function: function, line: line) + log("Completed", id: id, file: file, function: function, line: line) } } diff --git a/Sources/AsyncObjects/Logging/Loggable.swift b/Sources/AsyncObjects/Logging/Loggable.swift index 9bd04f4d..6879ce56 100644 --- a/Sources/AsyncObjects/Logging/Loggable.swift +++ b/Sources/AsyncObjects/Logging/Loggable.swift @@ -34,6 +34,36 @@ let level: Logger.Level = .debug let level: Logger.Level = .info #endif +/// Log a message attaching an optional identifier. +/// +/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`. +/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`. +/// Otherwise log level is set to `info`. +/// +/// - Parameters: +/// - message: The message to be logged. +/// - id: Optional identifier associated with message. +/// - file: The file this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#fileID`). +/// - function: The function this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#function`). +/// - line: The line this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#line`). +@inlinable +func log( + _ message: @autoclosure () -> Logger.Message, + id: UUID? = nil, + file: String = #fileID, + function: String = #function, + line: UInt = #line +) { + let metadata: Logger.Metadata = (id != nil) ? ["id": "\(id!)"] : [:] + logger.log( + level: level, message(), metadata: metadata, + file: file, function: function, line: line + ) +} + extension Loggable { /// Log a message attaching the default type specific metadata /// and optional identifier. @@ -102,6 +132,30 @@ extension LoggableActor { } } #else +/// Log a message attaching an optional identifier. +/// +/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`. +/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`. +/// Otherwise log level is set to `info`. +/// +/// - Parameters: +/// - message: The message to be logged. +/// - id: Optional identifier associated with message. +/// - file: The file this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#fileID`). +/// - function: The function this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#function`). +/// - line: The line this log message originates from (there's usually +/// no need to pass it explicitly as it defaults to `#line`). +@inlinable +func log( + _ message: @autoclosure () -> String, + id: UUID? = nil, + file: String = #fileID, + function: String = #function, + line: UInt = #line +) { /* Do nothing */ } + /// A type that emits log messages with specific metadata. @usableFromInline protocol Loggable {} diff --git a/Tests/AsyncObjectsTests/CancellationSourceTests.swift b/Tests/AsyncObjectsTests/CancellationSourceTests.swift index cf9277cf..5d678b37 100644 --- a/Tests/AsyncObjectsTests/CancellationSourceTests.swift +++ b/Tests/AsyncObjectsTests/CancellationSourceTests.swift @@ -9,14 +9,18 @@ class CancellationSourceTests: XCTestCase { let task = Task { try await Task.sleep(seconds: 10) } source.register(task: task) source.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testTaskCancellationWithTimeout() async throws { let task = Task { try await Task.sleep(seconds: 10) } let source = CancellationSource(cancelAfterNanoseconds: UInt64(1E9)) source.register(task: task) - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } #if swift(>=5.7) @@ -33,7 +37,9 @@ class CancellationSourceTests: XCTestCase { ) let task = Task { try await Task.sleep(seconds: 10, clock: clock) } source.register(task: task) - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5, clock: clock) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } #endif @@ -43,7 +49,9 @@ class CancellationSourceTests: XCTestCase { let task = Task { try await Task.sleep(seconds: 10) } source.register(task: task) pSource.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testTaskCancellationWithMultipleLinkedSources() async throws { @@ -53,7 +61,9 @@ class CancellationSourceTests: XCTestCase { let task = Task { try await Task.sleep(seconds: 10) } source.register(task: task) pSource1.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testAlreadyCancelledTask() async throws { @@ -76,7 +86,9 @@ class CancellationSourceTests: XCTestCase { source.register(task: task) try await task.value source.cancel() - XCTAssertFalse(task.isCancelled) + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + try await task.value } func testConcurrentCancellation() async throws { @@ -87,7 +99,9 @@ class CancellationSourceTests: XCTestCase { for _ in 0..<10 { group.addTask { source.cancel() } } await group.waitForAll() } - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testRegistrationAfterCancellation() async throws { @@ -95,7 +109,9 @@ class CancellationSourceTests: XCTestCase { let task = Task { try await Task.sleep(seconds: 10) } source.cancel() source.register(task: task) - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testMultipleTaskCancellation() async throws { @@ -107,9 +123,11 @@ class CancellationSourceTests: XCTestCase { source.register(task: task2) source.register(task: task3) source.cancel() - try await waitUntil(task1, timeout: 5) { $0.isCancelled } - try await waitUntil(task2, timeout: 5) { $0.isCancelled } - try await waitUntil(task3, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task1.isCancelled) + XCTAssertTrue(task2.isCancelled) + XCTAssertTrue(task3.isCancelled) } } @@ -125,7 +143,9 @@ class CancellationSourceInitializationTests: XCTestCase { } catch {} } source.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testDetachedTaskCancellation() async throws { @@ -137,7 +157,9 @@ class CancellationSourceInitializationTests: XCTestCase { } catch {} } source.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testThrowingTaskCancellation() async throws { @@ -146,7 +168,9 @@ class CancellationSourceInitializationTests: XCTestCase { try await Task.sleep(seconds: 10) } source.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } func testThrowingDetachedTaskCancellation() async throws { @@ -155,6 +179,8 @@ class CancellationSourceInitializationTests: XCTestCase { try await Task.sleep(seconds: 10) } source.cancel() - try await waitUntil(task, timeout: 5) { $0.isCancelled } + try await source.wait(forSeconds: 5) + XCTAssertTrue(source.isCancelled) + XCTAssertTrue(task.isCancelled) } }