Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Improve BehaviorSubject JavaDoc + related clarifications #5780

Merged
merged 2 commits into from
Jan 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,24 @@ public static Scheduler onComputationScheduler(@NonNull Scheduler defaultSchedul

/**
* Called when an undeliverable error occurs.
* <p>
* Undeliverable errors are those {@code Observer.onError()} invocations that are not allowed to happen on
* the given consumer type ({@code Observer}, {@code Subscriber}, etc.) due to protocol restrictions
* because the consumer has either disposed/cancelled its {@code Disposable}/{@code Subscription} or
* has already terminated with an {@code onError()} or {@code onComplete()} signal.
* <p>
* By default, this global error handler prints the stacktrace via {@link Throwable#printStackTrace()}
* and calls {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)}
* on the current thread.
* <p>
* Note that on some platforms, the platform runtime terminates the current application with an error if such
* uncaught exceptions happen. In this case, it is recommended the application installs a global error
* handler via the {@link #setErrorHandler(Consumer)} plugin method.
*
* @param error the error to report
* @see #getErrorHandler()
* @see #setErrorHandler(Consumer)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd
Do you think it makes sense to reference
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
here?
Is it acceptable to reference Wiki from javadoc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

* @see <a href="https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling">Error handling Wiki</a>
*/
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorProcessor.png" alt="">
* <p>
* This processor does not have a public constructor by design; a new empty instance of this
* {@code BehaviorSubject} can be created via the {@link #create()} method and
* {@code BehaviorProcessor} can be created via the {@link #create()} method and
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
* overload resolution conflict with {@code Flowable.create} that creates a Flowable, not a {@code BehaviorProcessor}).
* <p>
Expand Down Expand Up @@ -81,15 +81,15 @@
* <p>
* Even though {@code BehaviorProcessor} implements the {@code Subscriber} interface, calling
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
* if the processor is used as a standalone source. However, calling {@code onSubscribe} is
* called after the {@code BehaviorProcessor} reached its terminal state will result in the
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
* after the {@code BehaviorProcessor} reached its terminal state will result in the
* given {@code Subscription} being cancelled immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively.
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
* <p>
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
Expand All @@ -112,6 +112,16 @@
* <dt><b>Scheduler:</b></dt>
* <dd>{@code BehaviorProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>When the {@link #onError(Throwable)} is called, the {@code BehaviorProcessor} enters into a terminal state
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
* if one or more {@code Subscriber}s cancel their respective {@code Subscription}s, the
* {@code Throwable} is delivered to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
* cancel at once).
* If there were no {@code Subscriber}s subscribed to this {@code BehaviorProcessor} when the {@code onError()}
* was called, the global error handler is not invoked.
* </dd>
* </dl>
* <p>
* Example usage:
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,87 @@
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code BehaviorSubject} can be created via the {@link #create()} method and
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
* overload resolution conflict with {@code Observable.create} that creates an Observable, not a {@code BehaviorSubject}).
* <p>
* Since the {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
* default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and
* {@link #onError(Throwable)}.
* <p>
* Since a {@code BehaviorSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
* <p>
* When this {@code BehaviorSubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, the
* last observed item (if any) is cleared and late {@link io.reactivex.Observer}s only receive
* the respective terminal event.
* <p>
* The {@code BehaviorSubject} does not support clearing its cached value (to appear empty again), however, the
* effect can be achieved by using a special item and making sure {@code Observer}s subscribe through a
* filter whose predicate filters out this special item:
* <pre><code>
* BehaviorSubject&lt;Integer&gt; subject = BehaviorSubject.create();
*
* final Integer EMPTY = Integer.MIN_VALUE;
*
* Observable&lt;Integer&gt; observable = subject.filter(v -&gt; v != EMPTY);
*
* TestObserver&lt;Integer&gt; to1 = observable.test();
*
* observable.onNext(1);
* // this will "clear" the cache
* observable.onNext(EMPTY);
*
* TestObserver&lt;Integer&gt; to2 = observable.test();
*
* subject.onNext(2);
* subject.onComplete();
*
* // to1 received both non-empty items
* to1.assertResult(1, 2);
*
* // to2 received only 2 even though the current item was EMPTY
* // when it got subscribed
* to2.assertResult(2);
*
* // Observers coming after the subject was terminated receive
* // no items and only the onComplete event in this case.
* observable.test().assertResult();
* </code></pre>
* <p>
* Even though {@code BehaviorSubject} implements the {@code Observer} interface, calling
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
* after the {@code BehaviorSubjecct} reached its terminal state will result in the
* given {@code Disposable} being disposed immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
* consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
* <p>
* This {@code BehaviorSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the latest observed value
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
* {@link #getValues()} or {@link #getValues(Object[])}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code BehaviorSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
* the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>When the {@link #onError(Throwable)} is called, the {@code BehaviorSubject} enters into a terminal state
* and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission,
* if one or more {@code Observer}s dispose their respective {@code Disposable}s, the
* {@code Throwable} is delivered to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s
* cancel at once).
* If there were no {@code Observer}s subscribed to this {@code BehaviorSubject} when the {@code onError()}
* was called, the global error handler is not invoked.
* </dd>
* </dl>
* <p>
* Example usage:
* <pre> {@code

Expand Down