Lodash for events.
"Allows you to specify the dynamic behaviour of a value completely at the time of the declaration"
Observables emit items (as a sequence) and notifications (error, complete)
Collection of callbacks that knows how to listen to items delivered by the Observable
Represents the execution of an Observable, primarily used for cancelling the execution
Pure functions that enable functional progrmaming style of dealing with collections (eg: map, filter, concat, flatMap)
An observable and an observer. Equivalent to EventEmitter. Maintains list of observers delivers multicase items to multiple observers.
Dispatchers to coordinate when computation happens (eg: setTimeout, requestAnimationFrame)
An Observable that emits Observables
const observable = Rx.Observable.create();
Usually created with 'creation operators', eg: of
, from
, interval
Observable are cheap and simple to clone. It does not contain any data.
Provides a callback where data will be delivered to.
It is not a pub/sub pattern. The observer is not registered as a listener in the observable. The observable does not know about its observers.
A subscribe
call is simply a way to start an "Observable execution" and deliver items to the observer.
Three types of values can be delivered;
- 'Next' notification: send data to an observer
- 'Error' notification: send a JavaScript Error or exception
- 'Complete' notification: does not send a value
If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
observer.next(1);
observer.complete();
observer.error(err);
observer.next(2); // not delivered!
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Once called, observables without other observers can stop generating new items to emit.
observable.subscribe({
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
});
All three callbacks above are optional.
Uses Subject under the hood to make multiple Observers see the same Observable execution.
More info: http://reactivex.io/rxjs/manual/overview.html#multicasted-observables
Stores its current value, and emits it immediately when an Observer subscribes.
Similar to BehaviourSubject, but can also record a part of the Observable execution.
const subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
Or, with two params: bufferSize
and windowTime
in milliseconds.
const subject = new Rx.ReplaySubject(100, 500); // 100 item buffer, items only sent in last 500 ms
Last value of Observable is sent to Observers when execution completes.
Observables are either hot or cold. A plain observable with no subscribers, it will not be triggered. It does not generate new values if no subscriptions exist.
A "cold" observable waits until an observer subscribes to it before emitting items. The observer is guaranteed to see the beginning of the sequence. They replicate themselves for each new observer.
A "hot" observable may begin emitting items as soon as it's created. Any observer who later subscribes may start observing items from the middle. This is typically done with share()
or publish()
Observables are not shared by default.
var obs = Rx.Observable.interval(500).take(5)
.do(i => console.log("obs value "+ i) )
.share();
obs.subscribe(value => console.log("observer 1 received " + value));
obs.subscribe(value => console.log("observer 2 received " + value));
Using share, the stream is run once, and then the two subscriptions receive a value.
Without a share, the stream is run for each subscription.
A pure function that creates a new Observable based on the current Observable.
Choose an operator wizard: http://reactivex.io/rxjs/manual/overview.html#choose-an-operator
Decision tree: http://reactivex.io/documentation/operators.html#tree
Here are most operators (yes, there are more!)
from
: Convert almost anything into an ObservablefromEvent
: Create observable from DOM events, or Node EventEmitter eventsconst clicks = Rx.Observable.fromEvent(document, 'click');
fromPromise
interval
: Create an observable that emits integers spaced by time intervalconst numbers = Rx.Observable.interval(1000);
of
: Emit set of values sequentially, and then a complete notificationdefer
: Create a fresh observable for each observer
delay(delay: number | Date)
: Delay emission of items by given timeout or until given DatedelayWhen(delayDurationSelector: function(value: T): Observable)
: Like delay, but delay duration determined by second Observable- Pass in a function with source value as argument, and returns an Observable (the 'duration' Observable)
defaultIfEmpty(defaultValue: any)
: If the source Observable is empty, then emit a default valueevery(predicate: function(): boolean): Observable<boolean>
: Returns Observable that emits whether each item satisfies conditionfind(predicate: function(value: T, index: number, source: Observable<T>): boolean)
: Find first vlaue that passes a test and emit itfindIndex()
: Same asfind
, but emits the index of the found value, not the value itselfisEmpty()
: If source is empty it returns an Observabl ethat emits true, ootherwise it emits false
count(predicate: function(value: T, index: number, source: Observable<T>): boolean)
: Count number of emissions and emits that number when source completesmax()
: When source completes, emit item with largest numbermin()
reduce(accumulator: function(acc: R, value: T): R, seed: R)
: JOins new source value into accumulation from the past
-
merge(...other: Observable)
-
mergeAll()
: Flattens an Observable-of-Observables -
startsWith(values)
: Emits items before it begins to emit items from the source Observable -
withLatestFrom(...other: Observable, project?: function)
: Whenever source Observable emits a value, it outputs using that value plus the latest from other input variables, then emits result -
combineLatest(...other: Observable, project?: function)
: Whenever any Observable emits a value, run optional 'project' function, and emit output-
Subscribes to each input observable, and collects an array of each most recent values when they emit.
-
This array is run through optional project function, and then returned
const weight = Rx.Observable.of(70, 72, 76, 79, 75); const height = Rx.Observable.of(1.76, 1.77, 1.78); const bmi = weight.combineLatest(height, (w, h) => w / (h * h));
-
-
combineAll(project?: function)
: Flattens an Observable-of-Observables- Takes Observable of Observables, collects all Observables from it
- Once the outer Observable completes, all collected Observables are combined using
combineLatest
-
concat(...other: Observable)
: Concatenate multiple Observables together by sequentially emitting their values, one Observable after the other -
concatAll()
: Flatten Observable-of-Observables by putting one inner Observable after the other -
exhaust()
: Same asmergeAll()
, but ignores every new inner Observable if the previous Observable has not yet completed
debounceTime(dueTime: number)
: Like delay, but passes only most recent value from each burst of emissionsdebounce(durationSelector: function(value: T): Observable | Promise)
: Like debounceTime, but time span of emission silence is determined by a second Observabledistinct()
: Emits items emitted by source Observable that are distinct by comparison to previous items- Compares with equality check by default, but can pass in a custom comparison function to use
distinctKey(key: string)
: Same asdistinct
but uses a key to check if two items are distinctdistinctUntilChanged()
: Same asdistinct
, but does not keep a registry of the past, just compares to previousdistinctUntilKeyChanged(key: string)
: Same asdistinctUntilChanged
, but uses a key to check if two items are distinctelementAt(indexNumber: number, defaultValue?: T)
: Emits only the n-th value, then completesfilter(predicate: function(value: T: index: number): boolean)
: LikeArray.prototype.filter()
first(predicate?: function)
: Emits first value, or first value that passes some testignoreElements()
: Only passes complete or error notifications, does not emit anythingthrottleTime(duration: number)
: Lets a value pass, then ignores source values for nextduration
millisecondsthrottle(durationSelector: function(value: T): Observable)
: LikethrottleTime
, but silencing duration determined by second ObservableauditTime(duration: number)
: When it sees a source values, it ignores that plus the next ones forduration
milliseconds, and then it emits the most recent value from the source- Similar to
throttleTime
, but emits the last value from the silenced window instead of the first
- Similar to
audit(durationSelector: function(value: T): Observable)
: LikeauditTime
, but silencing duration determined by second Observablelast()
: Emit only last item, optionally by predicate functionsampleTime(period: number)
: Samples source at periodic intervals, and emits what it samplessample(notifier: Observable)
: LikesampleTime
, but samples whennotifier
Observable emits somethingsingle(a: function)
: Emits single item that matches predicate function. If source emits more than one, or none, then throws errors.skip(n: number)
: Skipn
items emittedskipUntil(n: Observable)
: Skip items emitted by source Observable until a second Observable emits an itemskipWhile(predicate: function)
: Skips items as long as condition holds true, then emits all further as soon as condition becomes falsetake(count: number)
: Takes the firstcount
values from the source, then compeltestakeLast(count: number)
: Remembers the latestcount
values, then emits those when source completestakeUntil(notifier: Observable)
: Emits values until a second Observable emits something, then completestakeWhile(predicate: function(value: T, index: number): boolean)
: Takes values from source while condition passes. When first value does not satisfy, completes.
-
buffer(notifier: Observable)
: Collect values as array and emit them when another Observable emits```javascript const clicks = Rx.Observable.fromEvent(document, 'click'); const interval = Rx.Observable.interval(1000); const buffered = interval.buffer(clicks); ```
-
bufferCount(size: number)
: Collect values as array and emit them when its size reaches size -
bufferTime(timeSpan: number)
: Collect values as array, and emit them periodically in time in milliseconds -
concatMap(project: function(value: T): Observable)
: Maps each value to an Observable, then flattens inner Observables usingconcatAll
-
concatMapTo(innerObservable: Observable)
: LikeconcatMap
, but maps each value to the same inner Observable -
exhaustMap
: Maps each value to an Observable, then flattens inner Observables withexhaust
-
map
: LikeArray.prototype.map()
-
mapTo(value: any)
: Likemap
, but maps every source value to same output every time -
mergeMap(function(value: T): Observable)
: Maps each value to an Observable, then flattens inner Observables usingmergeAll
-
mergeMapTo(innerObservable: Observable)
: LikemergeMap
, but maps each value always to the same inner Observable -
pairwise()
: Puts the current vlaue and previous value together as an array, and emits that -
partition(function(v: T, i: number): boolean)
: Outputs an array with two Observables determined by predicate function -
pluck(...properties: string)
: Likemap
, but picks up one of nested properties from each emitted object -
scan(acc: function(acc: R, v: T, i: number): R)
: Likereduce
, but emits accumulation whenever source em its a value -
switchMap(project: function(v: T, i: number): Observable)
: Maps each value to an Observable, then flattens usingswitch
-
switchMapTo(innerObservable: Observable)
: LikeswitchMap
, but maps each value always to same inner Observable -
window(boundaries: Observable)
: Likebuffer
, but emits a nested Observable instead of an array -
windowCount(size: number)
: LikebufferCount
, but emits nested Observable instead of an array -
windowTime(span: number)
: LikebufferTime
, but emits nested Observable instead of an array