Skip to content

Commit

Permalink
Fix a potential race condition. (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio authored Apr 14, 2020
1 parent 7052e6a commit d7ce36f
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions ReactiveFeedback/Floodgate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
struct QueueState {
var events: [(Event, Token)] = []
var isOuterLifetimeEnded = false

var hasEvents: Bool {
events.isEmpty == false && isOuterLifetimeEnded == false
}
}

let (stateDidChange, changeObserver) = Signal<State, Never>.pipe()
Expand All @@ -26,22 +30,41 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
defer { reducerLock.unlock() }

guard !hasStarted else { return }

hasStarted = true

changeObserver.send(value: state)
drainEvents()
}

override func process(_ event: Event, for token: Token) {
enqueue(event, for: token)

if reducerLock.try() {
// Fast path: No running effect.
defer { reducerLock.unlock() }

consume(event)
drainEvents()
} else {
// Slow path: Enqueue the event for the running effect to drain it on behalf of us.
enqueue(event, for: token)
repeat {
drainEvents()
reducerLock.unlock()
} while queue.withValue({ $0.hasEvents }) && reducerLock.try()
// ^^^
// Restart the event draining after we unlock the reducer lock, iff:
//
// 1. the queue still has unprocessed events; and
// 2. no concurrent actor has taken the reducer lock, which implies no event draining would be started
// unless we take active action.
//
// This eliminates a race condition in the following sequence of operations:
//
// | Thread A | Thread B |
// |------------------------------------|------------------------------------|
// | concurrent dequeue: no item | |
// | | concurrent enqueue |
// | | trylock lock: BUSY |
// | unlock lock | |
// | | |
// | <<< The enqueued event is left unprocessed. >>> |
//
// The trylock-unlock duo has a synchronize-with relationship, which ensures that Thread A must see any
// concurrent enqueue that *happens before* the trylock.
}
}

Expand All @@ -65,7 +88,7 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {

private func dequeue() -> Event? {
queue.modify {
guard !$0.isOuterLifetimeEnded, !$0.events.isEmpty else { return nil }
guard $0.hasEvents else { return nil }
return $0.events.removeFirst().0
}
}
Expand Down

0 comments on commit d7ce36f

Please sign in to comment.