From b902dc31173cd1192ee2e2764c0dcb03286225e4 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 23 May 2015 02:20:47 -0700 Subject: [PATCH] Refactor to give Interrupted special treatment --- ReactiveCocoa/Swift/Signal.swift | 91 ++++++++++++++++++------ ReactiveCocoa/Swift/SignalProducer.swift | 23 ++---- 2 files changed, 72 insertions(+), 42 deletions(-) diff --git a/ReactiveCocoa/Swift/Signal.swift b/ReactiveCocoa/Swift/Signal.swift index 85fc3c28c4..1f92dcc682 100644 --- a/ReactiveCocoa/Swift/Signal.swift +++ b/ReactiveCocoa/Swift/Signal.swift @@ -17,9 +17,17 @@ import Result public final class Signal { public typealias Observer = SinkOf> - private let lock = NSLock() private let atomicObservers: Atomic?> = Atomic(Bag()) + /// Used to ensure that events are serialized during delivery to observers. + private let sendLock = NSLock() + + /// Disposes of the generator provided at initialization time. + private let generatorDisposable = SerialDisposable() + + /// When disposed of, the Signal should interrupt as soon as possible. + private let interruptionDisposable = SimpleDisposable() + /// Initializes a Signal that will immediately invoke the given generator, /// then forward events sent to the given observer. /// @@ -27,32 +35,29 @@ public final class Signal { /// observer, at which point the disposable returned from the closure will /// be disposed as well. public init(_ generator: Observer -> Disposable?) { - lock.name = "org.reactivecocoa.ReactiveCocoa.Signal" + sendLock.name = "org.reactivecocoa.ReactiveCocoa.Signal" - let generatorDisposable = SerialDisposable() let sink = Observer { event in - let observers = self.atomicObservers.value - - if let observers = observers { - self.lock.lock() - - if event.isTerminating { - // Disallow any further events (e.g., any triggered - // recursively). - self.atomicObservers.value = nil - } - - for sink in observers { - sink.put(event) - } - - if event.isTerminating { - // Dispose only after notifying observers, so disposal logic - // is consistently the last thing to run. - generatorDisposable.dispose() + switch event { + case .Interrupted: + // Normally we disallow recursive events, but + // Interrupted is kind of a special snowflake, since it + // can inadvertently be sent by downstream consumers. + // + // So we'll flag Interrupted events specially, and if it + // happened to occur while we're sending something else, + // we'll wait to deliver it. + self.interruptionDisposable.dispose() + + if self.sendLock.tryLock() { + self.interrupt() + self.sendLock.unlock() + + self.generatorDisposable.dispose() } - self.lock.unlock() + default: + self.send(event) } } @@ -79,6 +84,46 @@ public final class Signal { return (signal, sink) } + /// Forwards the given event to all observers, terminating if necessary. + private func send(event: Event) { + let observers = self.atomicObservers.modify { observers in + if event.isTerminating { + return nil + } else { + return observers + } + } + + if let observers = observers { + self.sendLock.lock() + + for sink in observers { + sink.put(event) + } + + if interruptionDisposable.disposed { + interrupt() + } + + self.sendLock.unlock() + + if event.isTerminating || interruptionDisposable.disposed { + // Dispose only after notifying observers, so disposal logic + // is consistently the last thing to run. + self.generatorDisposable.dispose() + } + } + } + + /// Interrupts all observers and terminates the stream. + private func interrupt() { + if let observers = self.atomicObservers.swap(nil) { + for sink in observers { + sink.put(.Interrupted) + } + } + } + /// Observes the Signal by sending any future events to the given sink. If /// the Signal has already terminated, the sink will immediately receive an /// `Interrupted` event. diff --git a/ReactiveCocoa/Swift/SignalProducer.swift b/ReactiveCocoa/Swift/SignalProducer.swift index 7b465df8c2..f8bec8b8f1 100644 --- a/ReactiveCocoa/Swift/SignalProducer.swift +++ b/ReactiveCocoa/Swift/SignalProducer.swift @@ -207,33 +207,18 @@ public struct SignalProducer { let compositeDisposable = CompositeDisposable() setUp(signal, compositeDisposable) - if compositeDisposable.disposed { + compositeDisposable.addDisposable { + // Interrupt the observer and all dependents when disposed. sendInterrupted(observer) - return } - let sendingLock = NSLock() - sendingLock.name = "org.reactivecocoa.ReactiveCocoa.SignalProducer.startWithSignal" - - compositeDisposable.addDisposable { - // Interrupt the observer and all dependents when disposed, unless - // disposed as part of invoking the observer. - if sendingLock.tryLock() { - sendInterrupted(observer) - sendingLock.unlock() - } + if compositeDisposable.disposed { + return } let wrapperObserver = Signal.Observer { event in - sendingLock.lock() observer.put(event) - if compositeDisposable.disposed { - sendInterrupted(observer) - } - - sendingLock.unlock() - if event.isTerminating { // Dispose only after notifying the Signal, so disposal // logic is consistently the last thing to run.