From d7ce36f14bd7f1b0549fac9508fb9b157950e543 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Tue, 14 Apr 2020 17:30:40 +0100 Subject: [PATCH] Fix a potential race condition. (#58) --- ReactiveFeedback/Floodgate.swift | 41 +++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/ReactiveFeedback/Floodgate.swift b/ReactiveFeedback/Floodgate.swift index 54a8caa..558e2da 100644 --- a/ReactiveFeedback/Floodgate.swift +++ b/ReactiveFeedback/Floodgate.swift @@ -5,6 +5,10 @@ final class Floodgate: FeedbackEventConsumer { struct QueueState { var events: [(Event, Token)] = [] var isOuterLifetimeEnded = false + + var hasEvents: Bool { + events.isEmpty == false && isOuterLifetimeEnded == false + } } let (stateDidChange, changeObserver) = Signal.pipe() @@ -26,6 +30,7 @@ final class Floodgate: FeedbackEventConsumer { defer { reducerLock.unlock() } guard !hasStarted else { return } + hasStarted = true changeObserver.send(value: state) @@ -33,15 +38,33 @@ final class Floodgate: FeedbackEventConsumer { } override func process(_ event: Event, for token: Token) { + enqueue(event, for: token) + if reducerLock.try() { - // Fast path: No running effect. - defer { reducerLock.unlock() } - - consume(event) - drainEvents() - } else { - // Slow path: Enqueue the event for the running effect to drain it on behalf of us. - enqueue(event, for: token) + repeat { + drainEvents() + reducerLock.unlock() + } while queue.withValue({ $0.hasEvents }) && reducerLock.try() + // ^^^ + // Restart the event draining after we unlock the reducer lock, iff: + // + // 1. the queue still has unprocessed events; and + // 2. no concurrent actor has taken the reducer lock, which implies no event draining would be started + // unless we take active action. + // + // This eliminates a race condition in the following sequence of operations: + // + // | Thread A | Thread B | + // |------------------------------------|------------------------------------| + // | concurrent dequeue: no item | | + // | | concurrent enqueue | + // | | trylock lock: BUSY | + // | unlock lock | | + // | | | + // | <<< The enqueued event is left unprocessed. >>> | + // + // The trylock-unlock duo has a synchronize-with relationship, which ensures that Thread A must see any + // concurrent enqueue that *happens before* the trylock. } } @@ -65,7 +88,7 @@ final class Floodgate: FeedbackEventConsumer { private func dequeue() -> Event? { queue.modify { - guard !$0.isOuterLifetimeEnded, !$0.events.isEmpty else { return nil } + guard $0.hasEvents else { return nil } return $0.events.removeFirst().0 } }