Skip to content

Commit

Permalink
refactor: add identifier metadata for cancellation source registered …
Browse files Browse the repository at this point in the history
…tasks (#18)

* feat: add cooperative cancellation to cancellation source registered tasks completion wait

* wip: remove generics to fix Swift 5.6 build

* wip: use for loop instead of iterator

* wip: revert concurrent async stream iteration calls

* wip: fix cancellation handling

* wip: make cancellation task detached
  • Loading branch information
soumyamahunt authored Mar 4, 2023
1 parent 5ce6b1a commit 0ab431e
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 39 deletions.
38 changes: 38 additions & 0 deletions Sources/AsyncObjects/CancellationSource/Cancellable.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Foundation

/// A type representing a unit of work or task that supports cancellation.
///
/// Cancellation should be initiated on invoking ``cancel(file:function:line:)``
Expand Down Expand Up @@ -92,3 +94,39 @@ extension Task: Cancellable {
let _ = try await self.value
}
}

/// Waits asynchronously for the work or task to complete
/// handling cooperative cancellation initiation.
///
/// - Parameters:
/// - work: The work for which completion to wait and handle cooperative cancellation.
/// - id: The identifier associated with work.
/// - file: The file wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function wait request originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
///
/// - Throws: If waiting for the work completes with an error.
@inlinable
func waitHandlingCancelation(
for work: Cancellable,
associatedId id: UUID,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) async throws {
try await withTaskCancellationHandler {
defer {
log("Finished", id: id, file: file, function: function, line: line)
}
try await work.wait(file: file, function: function, line: line)
} onCancel: {
work.cancel(file: file, function: function, line: line)
log(
"Cancellation initiated", id: id,
file: file, function: function, line: line
)
}
}
40 changes: 16 additions & 24 deletions Sources/AsyncObjects/CancellationSource/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ import Foundation
/// tasks in that case.
public struct CancellationSource: AsyncObject, Cancellable, Loggable {
/// The continuation type controlling task group lifetime.
@usableFromInline
internal typealias Continuation = GlobalContinuation<Void, Error>
/// The cancellable work with invocation context.
@usableFromInline
internal typealias WorkItem = (
Cancellable, file: String, function: String, line: UInt
Cancellable, id: UUID, file: String, function: String, line: UInt
)

/// The lifetime task that is cancelled when
Expand All @@ -47,7 +45,6 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
var lifetime: Task<Void, Error>!
/// The stream continuation used to register work items
/// for cooperative cancellation.
@usableFromInline
var pipe: AsyncStream<WorkItem>.Continuation!

/// A Boolean value that indicates whether cancellation is already
Expand All @@ -57,30 +54,24 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
/// There is no way to uncancel on this source. Create a new
/// `CancellationSource` to manage cancellation of newly spawned
/// tasks in that case.
@inlinable
public var isCancelled: Bool { lifetime.isCancelled }

/// Creates a new cancellation source object.
///
/// - Returns: The newly created cancellation source.
public init() {
let stream = AsyncStream<WorkItem> { self.pipe = $0 }
self.lifetime = Task {
self.lifetime = Task.detached {
try await withThrowingTaskGroup(of: Void.self) { group in
for await item in stream {
group.addTask {
try? await withTaskCancellationHandler {
try await item.0.wait(
file: item.file,
function: item.function,
line: item.line
)
} onCancel: {
item.0.cancel(
file: item.file,
function: item.function,
line: item.line
)
}
try? await waitHandlingCancelation(
for: item.0, associatedId: item.id,
file: item.file,
function: item.function,
line: item.line
)
}
}

Expand Down Expand Up @@ -110,18 +101,19 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
function: String = #function,
line: UInt = #line
) {
let result = pipe.yield((task, file, function, line))
let id = UUID()
let result = pipe.yield((task, id, file, function, line))
switch result {
case .enqueued:
log(
"Registered \(task)",
"Registered \(task)", id: id,
file: file, function: function, line: line
)
case .dropped, .terminated: fallthrough
@unknown default:
task.cancel(file: file, function: function, line: line)
log(
"Cancelled \(task) due to result: \(result)",
"Cancelled \(task) due to result: \(result)", id: id,
file: file, function: function, line: line
)
}
Expand Down Expand Up @@ -172,10 +164,10 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
function: String = #function,
line: UInt = #line
) async {
let key = UUID()
log("Waiting", id: key, file: file, function: function, line: line)
let id = UUID()
log("Waiting", id: id, file: file, function: function, line: line)
let _ = await lifetime.result
log("Completed", id: key, file: file, function: function, line: line)
log("Completed", id: id, file: file, function: function, line: line)
}
}

Expand Down
54 changes: 54 additions & 0 deletions Sources/AsyncObjects/Logging/Loggable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,36 @@ let level: Logger.Level = .debug
let level: Logger.Level = .info
#endif

/// Log a message attaching an optional identifier.
///
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`.
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`.
/// Otherwise log level is set to `info`.
///
/// - Parameters:
/// - message: The message to be logged.
/// - id: Optional identifier associated with message.
/// - file: The file this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#fileID`).
/// - function: The function this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#function`).
/// - line: The line this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#line`).
@inlinable
func log(
_ message: @autoclosure () -> Logger.Message,
id: UUID? = nil,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
let metadata: Logger.Metadata = (id != nil) ? ["id": "\(id!)"] : [:]
logger.log(
level: level, message(), metadata: metadata,
file: file, function: function, line: line
)
}

extension Loggable {
/// Log a message attaching the default type specific metadata
/// and optional identifier.
Expand Down Expand Up @@ -102,6 +132,30 @@ extension LoggableActor {
}
}
#else
/// Log a message attaching an optional identifier.
///
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`.
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`.
/// Otherwise log level is set to `info`.
///
/// - Parameters:
/// - message: The message to be logged.
/// - id: Optional identifier associated with message.
/// - file: The file this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#fileID`).
/// - function: The function this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#function`).
/// - line: The line this log message originates from (there's usually
/// no need to pass it explicitly as it defaults to `#line`).
@inlinable
func log(
_ message: @autoclosure () -> String,
id: UUID? = nil,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) { /* Do nothing */ }

/// A type that emits log messages with specific metadata.
@usableFromInline
protocol Loggable {}
Expand Down
56 changes: 41 additions & 15 deletions Tests/AsyncObjectsTests/CancellationSourceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ class CancellationSourceTests: XCTestCase {
let task = Task { try await Task.sleep(seconds: 10) }
source.register(task: task)
source.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testTaskCancellationWithTimeout() async throws {
let task = Task { try await Task.sleep(seconds: 10) }
let source = CancellationSource(cancelAfterNanoseconds: UInt64(1E9))
source.register(task: task)
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

#if swift(>=5.7)
Expand All @@ -33,7 +37,9 @@ class CancellationSourceTests: XCTestCase {
)
let task = Task { try await Task.sleep(seconds: 10, clock: clock) }
source.register(task: task)
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5, clock: clock)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}
#endif

Expand All @@ -43,7 +49,9 @@ class CancellationSourceTests: XCTestCase {
let task = Task { try await Task.sleep(seconds: 10) }
source.register(task: task)
pSource.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testTaskCancellationWithMultipleLinkedSources() async throws {
Expand All @@ -53,7 +61,9 @@ class CancellationSourceTests: XCTestCase {
let task = Task { try await Task.sleep(seconds: 10) }
source.register(task: task)
pSource1.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testAlreadyCancelledTask() async throws {
Expand All @@ -76,7 +86,9 @@ class CancellationSourceTests: XCTestCase {
source.register(task: task)
try await task.value
source.cancel()
XCTAssertFalse(task.isCancelled)
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
try await task.value
}

func testConcurrentCancellation() async throws {
Expand All @@ -87,15 +99,19 @@ class CancellationSourceTests: XCTestCase {
for _ in 0..<10 { group.addTask { source.cancel() } }
await group.waitForAll()
}
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testRegistrationAfterCancellation() async throws {
let source = CancellationSource()
let task = Task { try await Task.sleep(seconds: 10) }
source.cancel()
source.register(task: task)
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testMultipleTaskCancellation() async throws {
Expand All @@ -107,9 +123,11 @@ class CancellationSourceTests: XCTestCase {
source.register(task: task2)
source.register(task: task3)
source.cancel()
try await waitUntil(task1, timeout: 5) { $0.isCancelled }
try await waitUntil(task2, timeout: 5) { $0.isCancelled }
try await waitUntil(task3, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task1.isCancelled)
XCTAssertTrue(task2.isCancelled)
XCTAssertTrue(task3.isCancelled)
}
}

Expand All @@ -125,7 +143,9 @@ class CancellationSourceInitializationTests: XCTestCase {
} catch {}
}
source.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testDetachedTaskCancellation() async throws {
Expand All @@ -137,7 +157,9 @@ class CancellationSourceInitializationTests: XCTestCase {
} catch {}
}
source.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testThrowingTaskCancellation() async throws {
Expand All @@ -146,7 +168,9 @@ class CancellationSourceInitializationTests: XCTestCase {
try await Task.sleep(seconds: 10)
}
source.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}

func testThrowingDetachedTaskCancellation() async throws {
Expand All @@ -155,6 +179,8 @@ class CancellationSourceInitializationTests: XCTestCase {
try await Task.sleep(seconds: 10)
}
source.cancel()
try await waitUntil(task, timeout: 5) { $0.isCancelled }
try await source.wait(forSeconds: 5)
XCTAssertTrue(source.isCancelled)
XCTAssertTrue(task.isCancelled)
}
}

0 comments on commit 0ab431e

Please sign in to comment.