Skip to content

Commit

Permalink
refactor: reduce code repeatitions
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 31, 2022
1 parent e185079 commit 364e16c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
15 changes: 11 additions & 4 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public actor AsyncCountdownEvent: AsyncObject {

// MARK: Internal

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
@inlinable
func _resumeContinuation(_ continuation: Continuation) {
currentCount += 1
continuation.resume()
}

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
Expand All @@ -58,8 +67,7 @@ public actor AsyncCountdownEvent: AsyncObject {
withKey key: UUID
) {
guard !isSet, continuations.isEmpty else {
currentCount += 1
continuation.resume()
_resumeContinuation(continuation)
return
}
continuations[key] = continuation
Expand Down Expand Up @@ -90,8 +98,7 @@ public actor AsyncCountdownEvent: AsyncObject {
func _resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
self.currentCount += 1
_resumeContinuation(continuation)
}
}

Expand Down
42 changes: 25 additions & 17 deletions Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,37 @@ public actor TaskQueue: AsyncObject {
|| flags.wait(forCurrent: currentRunning)
}

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
/// - Returns: Whether queue is free to proceed scheduling other tasks.
@inlinable
@discardableResult
func _resumeQueuedContinuation(
_ continuation: QueuedContinuation
) -> Bool {
currentRunning += 1
continuation.value.resume()
guard continuation.flags.isBlockEnabled else { return true }
blocked = true
return false
}

/// Add continuation with the provided key and associated flags to queue.
///
/// - Parameters:
/// - flags: The flags associated with continuation operation.
/// - key: The key in the continuation queue.
/// - continuation: The continuation to add to queue.
/// - continuation: The continuation and flags to add to queue.
@inlinable
func _queueContinuation(
withFlags flags: Flags = [],
atKey key: UUID = .init(),
_ continuation: Continuation
_ continuation: QueuedContinuation
) {
guard _wait(whenFlags: flags) else {
currentRunning += 1
continuation.resume()
guard _wait(whenFlags: continuation.flags) else {
_resumeQueuedContinuation(continuation)
return
}
queue[key] = (value: continuation, flags: flags)
queue[key] = continuation
}

/// Remove continuation associated with provided key from queue.
Expand Down Expand Up @@ -218,16 +231,12 @@ public actor TaskQueue: AsyncObject {
/// and operation flags preconditions satisfied.
@inlinable
func _resumeQueuedTasks() {
while let (_, (continuation, flags)) = queue.elements.first,
while let (_, continuation) = queue.elements.first,
!blocked,
!flags.wait(forCurrent: currentRunning)
!continuation.flags.wait(forCurrent: currentRunning)
{
queue.removeFirst()
currentRunning += 1
continuation.resume()
guard flags.isBlockEnabled else { continue }
blocked = true
break
guard _resumeQueuedContinuation(continuation) else { break }
}
}

Expand All @@ -252,9 +261,8 @@ public actor TaskQueue: AsyncObject {
try await Continuation.with { continuation in
Task { [weak self] in
await self?._queueContinuation(
withFlags: flags,
atKey: key,
continuation
(value: continuation, flags: flags)
)
}
}
Expand Down

0 comments on commit 364e16c

Please sign in to comment.