Skip to content

Commit

Permalink
Refactor to give Interrupted special treatment
Browse files Browse the repository at this point in the history
  • Loading branch information
jspahrsummers committed May 23, 2015
1 parent 414b402 commit b902dc3
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 42 deletions.
91 changes: 68 additions & 23 deletions ReactiveCocoa/Swift/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,47 @@ import Result
public final class Signal<T, E: ErrorType> {
public typealias Observer = SinkOf<Event<T, E>>

private let lock = NSLock()
private let atomicObservers: Atomic<Bag<Observer>?> = Atomic(Bag())

/// Used to ensure that events are serialized during delivery to observers.
private let sendLock = NSLock()

/// Disposes of the generator provided at initialization time.
private let generatorDisposable = SerialDisposable()

/// When disposed of, the Signal should interrupt as soon as possible.
private let interruptionDisposable = SimpleDisposable()

/// Initializes a Signal that will immediately invoke the given generator,
/// then forward events sent to the given observer.
///
/// The Signal will remain alive until a terminating event is sent to the
/// observer, at which point the disposable returned from the closure will
/// be disposed as well.
public init(_ generator: Observer -> Disposable?) {
lock.name = "org.reactivecocoa.ReactiveCocoa.Signal"
sendLock.name = "org.reactivecocoa.ReactiveCocoa.Signal"

let generatorDisposable = SerialDisposable()
let sink = Observer { event in
let observers = self.atomicObservers.value

if let observers = observers {
self.lock.lock()

if event.isTerminating {
// Disallow any further events (e.g., any triggered
// recursively).
self.atomicObservers.value = nil
}

for sink in observers {
sink.put(event)
}

if event.isTerminating {
// Dispose only after notifying observers, so disposal logic
// is consistently the last thing to run.
generatorDisposable.dispose()
switch event {
case .Interrupted:
// Normally we disallow recursive events, but
// Interrupted is kind of a special snowflake, since it
// can inadvertently be sent by downstream consumers.
//
// So we'll flag Interrupted events specially, and if it
// happened to occur while we're sending something else,
// we'll wait to deliver it.
self.interruptionDisposable.dispose()

if self.sendLock.tryLock() {
self.interrupt()
self.sendLock.unlock()

self.generatorDisposable.dispose()
}

self.lock.unlock()
default:
self.send(event)
}
}

Expand All @@ -79,6 +84,46 @@ public final class Signal<T, E: ErrorType> {
return (signal, sink)
}

/// Forwards the given event to all observers, terminating if necessary.
private func send(event: Event<T, E>) {
let observers = self.atomicObservers.modify { observers in
if event.isTerminating {
return nil
} else {
return observers
}
}

if let observers = observers {
self.sendLock.lock()

for sink in observers {
sink.put(event)
}

if interruptionDisposable.disposed {
interrupt()
}

self.sendLock.unlock()

if event.isTerminating || interruptionDisposable.disposed {
// Dispose only after notifying observers, so disposal logic
// is consistently the last thing to run.
self.generatorDisposable.dispose()
}
}
}

/// Interrupts all observers and terminates the stream.
private func interrupt() {
if let observers = self.atomicObservers.swap(nil) {
for sink in observers {
sink.put(.Interrupted)
}
}
}

/// Observes the Signal by sending any future events to the given sink. If
/// the Signal has already terminated, the sink will immediately receive an
/// `Interrupted` event.
Expand Down
23 changes: 4 additions & 19 deletions ReactiveCocoa/Swift/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,33 +207,18 @@ public struct SignalProducer<T, E: ErrorType> {
let compositeDisposable = CompositeDisposable()
setUp(signal, compositeDisposable)

if compositeDisposable.disposed {
compositeDisposable.addDisposable {
// Interrupt the observer and all dependents when disposed.
sendInterrupted(observer)
return
}

let sendingLock = NSLock()
sendingLock.name = "org.reactivecocoa.ReactiveCocoa.SignalProducer.startWithSignal"

compositeDisposable.addDisposable {
// Interrupt the observer and all dependents when disposed, unless
// disposed as part of invoking the observer.
if sendingLock.tryLock() {
sendInterrupted(observer)
sendingLock.unlock()
}
if compositeDisposable.disposed {
return
}

let wrapperObserver = Signal<T, E>.Observer { event in
sendingLock.lock()
observer.put(event)

if compositeDisposable.disposed {
sendInterrupted(observer)
}

sendingLock.unlock()

if event.isTerminating {
// Dispose only after notifying the Signal, so disposal
// logic is consistently the last thing to run.
Expand Down

0 comments on commit b902dc3

Please sign in to comment.