Skip to content

Commit

Permalink
2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError …
Browse files Browse the repository at this point in the history
…to Observable and Flowable (#4589)

* 2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable

* Address issues
  • Loading branch information
vanniktech authored and akarnokd committed Sep 23, 2016
1 parent d923f31 commit 603f6c6
Show file tree
Hide file tree
Showing 16 changed files with 584 additions and 21 deletions.
100 changes: 100 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7679,6 +7679,37 @@ public final Single<T> elementAt(long index, T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, defaultItem));
}

/**
* Returns a Flowable that emits the item found at a specified index in a sequence of emissions from a
* source Publisher.
* If the source Publisher does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param index
* the zero-based index of the item to retrieve
* @return a Flowable that emits the item at the specified position in the sequence emitted by the source
* Publisher, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAtOrError(long index) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, null));
}

/**
* Filters items emitted by a Publisher by only emitting those that satisfy a specified predicate.
* <p>
Expand Down Expand Up @@ -7753,6 +7784,29 @@ public final Single<T> first(T defaultItem) {
return elementAt(0, defaultItem);
}

/**
* Returns a Single that emits only the very first item emitted by the source Publisher, or a default
* item if the source Publisher completes without emitting anything.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> firstOrError() {
return elementAtOrError(0);
}

/**
* Returns a Flowable that emits items based on applying a function that you supply to each item emitted
* by the source Publisher, where that function returns a Publisher, and then merging those resulting
Expand Down Expand Up @@ -8904,6 +8958,28 @@ public final Single<T> last(T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits only the last item emitted by the source Publisher.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> lastOrError() {
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, null));
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* other standard composition methods first;</strong>
Expand Down Expand Up @@ -11211,6 +11287,30 @@ public final Single<T> single(T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits the single item emitted by the source Publisher, if that Publisher
* emits only a single item.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* If the source Publisher emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> singleOrError() {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, null));
}

/**
* Returns a Flowable that skips the first {@code count} items emitted by the source Publisher and emits
* the remainder.
Expand Down
98 changes: 92 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6657,6 +6657,32 @@ public final Single<T> elementAt(long index, T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, defaultItem));
}

/**
* Returns a Single that emits the item found at a specified index in a sequence of emissions from a source ObservableSource.
* If the source ObservableSource does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param index
* the zero-based index of the item to retrieve
* @return a Single that emits the item at the specified position in the sequence emitted by the source
* ObservableSource, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAtOrError(long index) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, null));
}

/**
* Filters items emitted by an ObservableSource by only emitting those that satisfy a specified predicate.
* <p>
Expand Down Expand Up @@ -6698,8 +6724,8 @@ public final Maybe<T> firstElement() {
}

/**
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default
* item if the source ObservableSource completes without emitting anything.
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default item
* if the source ObservableSource completes without emitting any items.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="">
* <dl>
Expand All @@ -6717,6 +6743,24 @@ public final Single<T> first(T defaultItem) {
return elementAt(0L, defaultItem);
}

/**
* Returns a Single that emits only the very first item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> firstOrError() {
return elementAtOrError(0L);
}

/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting
Expand Down Expand Up @@ -7660,7 +7704,7 @@ public final Maybe<T> lastElement() {
}

/**
* Returns an Observable that emits only the last item emitted by the source ObservableSource, or a default item
* Returns a Single that emits only the last item emitted by the source ObservableSource, or a default item
* if the source ObservableSource completes without emitting any items.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrDefault.png" alt="">
Expand All @@ -7681,6 +7725,25 @@ public final Single<T> last(T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits only the last item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a Single that emits only the last item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> lastOrError() {
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, null));
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* other standard composition methods first;</strong>
Expand Down Expand Up @@ -9333,7 +9396,7 @@ public final Maybe<T> singleElement() {
}

/**
* Returns an Observable that emits the single item emitted by the source ObservableSource, if that ObservableSource
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item, or a default item if the source ObservableSource emits no items. If the source
* ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
Expand All @@ -9345,8 +9408,7 @@ public final Maybe<T> singleElement() {
*
* @param defaultItem
* a default value to emit if the source ObservableSource emits no item
* @return an Observable that emits the single item emitted by the source ObservableSource, or a default item if
* the source ObservableSource is empty
* @return the new Single instance
* @throws IllegalArgumentException
* if the source ObservableSource emits more than one item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
Expand All @@ -9357,6 +9419,30 @@ public final Single<T> single(T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* If the source ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrDefault.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @throws IllegalArgumentException
* if the source ObservableSource emits more than one item
* @throws NoSuchElementException
* if the source ObservableSource completes without emitting any items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> singleOrError() {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, null));
}

/**
* Returns an Observable that skips the first {@code count} items emitted by the source ObservableSource and emits
* the remainder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -104,7 +105,14 @@ public void onComplete() {
s = SubscriptionHelper.CANCELLED;
if (index <= count && !done) {
done = true;
actual.onSuccess(defaultValue);

T v = defaultValue;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -100,7 +101,13 @@ public void onComplete() {
item = null;
actual.onSuccess(v);
} else {
actual.onSuccess(defaultItem);
v = defaultItem;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -107,7 +108,12 @@ public void onComplete() {
if (v == null) {
v = defaultValue;
}
actual.onSuccess(v);

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import java.util.NoSuchElementException;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableElementAtSingle<T> extends Single<T> {
final ObservableSource<T> source;
final long index;
final T defaultValue;

public ObservableElementAtSingle(ObservableSource<T> source, long index, T defaultValue) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
}

@Override
public void subscribeActual(SingleObserver<? super T> t) {
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
Expand Down Expand Up @@ -98,7 +101,14 @@ public void onError(Throwable t) {
public void onComplete() {
if (index <= count && !done) {
done = true;
actual.onSuccess(defaultValue);

T v = defaultValue;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}
}
Expand Down
Loading

0 comments on commit 603f6c6

Please sign in to comment.