From 364e16caa01c77888d67b3b4f1cd729009724cee Mon Sep 17 00:00:00 2001 From: Soumya Ranjan Mahunt Date: Wed, 31 Aug 2022 16:03:55 +0530 Subject: [PATCH] refactor: reduce code repeatitions --- .../AsyncObjects/AsyncCountdownEvent.swift | 15 +++++-- Sources/AsyncObjects/TaskQueue.swift | 42 +++++++++++-------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/Sources/AsyncObjects/AsyncCountdownEvent.swift b/Sources/AsyncObjects/AsyncCountdownEvent.swift index 2c799655..cf0294bf 100644 --- a/Sources/AsyncObjects/AsyncCountdownEvent.swift +++ b/Sources/AsyncObjects/AsyncCountdownEvent.swift @@ -47,6 +47,15 @@ public actor AsyncCountdownEvent: AsyncObject { // MARK: Internal + /// Resume provided continuation with additional changes based on the associated flags. + /// + /// - Parameter continuation: The queued continuation to resume. + @inlinable + func _resumeContinuation(_ continuation: Continuation) { + currentCount += 1 + continuation.resume() + } + /// Add continuation with the provided key in `continuations` map. /// /// - Parameters: @@ -58,8 +67,7 @@ public actor AsyncCountdownEvent: AsyncObject { withKey key: UUID ) { guard !isSet, continuations.isEmpty else { - currentCount += 1 - continuation.resume() + _resumeContinuation(continuation) return } continuations[key] = continuation @@ -90,8 +98,7 @@ public actor AsyncCountdownEvent: AsyncObject { func _resumeContinuations() { while !continuations.isEmpty && isSet { let (_, continuation) = continuations.removeFirst() - continuation.resume() - self.currentCount += 1 + _resumeContinuation(continuation) } } diff --git a/Sources/AsyncObjects/TaskQueue.swift b/Sources/AsyncObjects/TaskQueue.swift index 8d3f9587..6d1230fa 100644 --- a/Sources/AsyncObjects/TaskQueue.swift +++ b/Sources/AsyncObjects/TaskQueue.swift @@ -162,24 +162,37 @@ public actor TaskQueue: AsyncObject { || flags.wait(forCurrent: currentRunning) } + /// Resume provided continuation with additional changes based on the associated flags. + /// + /// - Parameter continuation: The queued continuation to resume. + /// - Returns: Whether queue is free to proceed scheduling other tasks. + @inlinable + @discardableResult + func _resumeQueuedContinuation( + _ continuation: QueuedContinuation + ) -> Bool { + currentRunning += 1 + continuation.value.resume() + guard continuation.flags.isBlockEnabled else { return true } + blocked = true + return false + } + /// Add continuation with the provided key and associated flags to queue. /// /// - Parameters: - /// - flags: The flags associated with continuation operation. /// - key: The key in the continuation queue. - /// - continuation: The continuation to add to queue. + /// - continuation: The continuation and flags to add to queue. @inlinable func _queueContinuation( - withFlags flags: Flags = [], atKey key: UUID = .init(), - _ continuation: Continuation + _ continuation: QueuedContinuation ) { - guard _wait(whenFlags: flags) else { - currentRunning += 1 - continuation.resume() + guard _wait(whenFlags: continuation.flags) else { + _resumeQueuedContinuation(continuation) return } - queue[key] = (value: continuation, flags: flags) + queue[key] = continuation } /// Remove continuation associated with provided key from queue. @@ -218,16 +231,12 @@ public actor TaskQueue: AsyncObject { /// and operation flags preconditions satisfied. @inlinable func _resumeQueuedTasks() { - while let (_, (continuation, flags)) = queue.elements.first, + while let (_, continuation) = queue.elements.first, !blocked, - !flags.wait(forCurrent: currentRunning) + !continuation.flags.wait(forCurrent: currentRunning) { queue.removeFirst() - currentRunning += 1 - continuation.resume() - guard flags.isBlockEnabled else { continue } - blocked = true - break + guard _resumeQueuedContinuation(continuation) else { break } } } @@ -252,9 +261,8 @@ public actor TaskQueue: AsyncObject { try await Continuation.with { continuation in Task { [weak self] in await self?._queueContinuation( - withFlags: flags, atKey: key, - continuation + (value: continuation, flags: flags) ) } }