From 603f6c690a49a0eb73a7b9cec247ba3987e96772 Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Fri, 23 Sep 2016 23:19:00 +0200 Subject: [PATCH] 2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable (#4589) * 2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable * Address issues --- src/main/java/io/reactivex/Flowable.java | 100 ++++++++++++++++++ src/main/java/io/reactivex/Observable.java | 98 +++++++++++++++-- .../flowable/FlowableElementAtSingle.java | 10 +- .../flowable/FlowableLastSingle.java | 9 +- .../flowable/FlowableSingleSingle.java | 8 +- .../observable/ObservableElementAtSingle.java | 12 ++- .../observable/ObservableLastSingle.java | 6 +- .../observable/ObservableSingleSingle.java | 9 +- .../flowable/FlowableElementAtTest.java | 55 +++++++++- .../operators/flowable/FlowableFirstTest.java | 41 ++++++- .../operators/flowable/FlowableLastTest.java | 41 ++++++- .../flowable/FlowableSingleTest.java | 40 ++++++- .../observable/ObservableElementAtTest.java | 56 +++++++++- .../observable/ObservableFirstTest.java | 41 ++++++- .../observable/ObservableLastTest.java | 41 ++++++- .../observable/ObservableSingleTest.java | 38 +++++++ 16 files changed, 584 insertions(+), 21 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 95e0b0ee45..ce0a00d214 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7679,6 +7679,37 @@ public final Single elementAt(long index, T defaultItem) { return RxJavaPlugins.onAssembly(new FlowableElementAtSingle(this, index, defaultItem)); } + /** + * Returns a Flowable that emits the item found at a specified index in a sequence of emissions from a + * source Publisher. + * If the source Publisher does not contain the item at the specified index a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner + * (i.e., no backpressure applied to it).
+ *
Scheduler:
+ *
{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param index + * the zero-based index of the item to retrieve + * @return a Flowable that emits the item at the specified position in the sequence emitted by the source + * Publisher, or the default item if that index is outside the bounds of the source sequence + * @throws IndexOutOfBoundsException + * if {@code index} is less than 0 + * @see ReactiveX operators documentation: ElementAt + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Single elementAtOrError(long index) { + if (index < 0) { + throw new IndexOutOfBoundsException("index >= 0 required but it was " + index); + } + return RxJavaPlugins.onAssembly(new FlowableElementAtSingle(this, index, null)); + } + /** * Filters items emitted by a Publisher by only emitting those that satisfy a specified predicate. *

@@ -7753,6 +7784,29 @@ public final Single first(T defaultItem) { return elementAt(0, defaultItem); } + /** + * Returns a Single that emits only the very first item emitted by the source Publisher, or a default + * item if the source Publisher completes without emitting anything. + * If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an + * unbounded manner (i.e., without applying backpressure).
+ *
Scheduler:
+ *
{@code firstOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Single instance + * @see ReactiveX operators documentation: First + */ + @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @SchedulerSupport(SchedulerSupport.NONE) + public final Single firstOrError() { + return elementAtOrError(0); + } + /** * Returns a Flowable that emits items based on applying a function that you supply to each item emitted * by the source Publisher, where that function returns a Publisher, and then merging those resulting @@ -8904,6 +8958,28 @@ public final Single last(T defaultItem) { return RxJavaPlugins.onAssembly(new FlowableLastSingle(this, defaultItem)); } + /** + * Returns a Single that emits only the last item emitted by the source Publisher. + * If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an + * unbounded manner (i.e., without applying backpressure).
+ *
Scheduler:
+ *
{@code lastOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Single instance + * @see ReactiveX operators documentation: Last + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Single lastOrError() { + return RxJavaPlugins.onAssembly(new FlowableLastSingle(this, null)); + } + /** * This method requires advanced knowledge about building operators; please consider * other standard composition methods first; @@ -11211,6 +11287,30 @@ public final Single single(T defaultItem) { return RxJavaPlugins.onAssembly(new FlowableSingleSingle(this, defaultItem)); } + /** + * Returns a Single that emits the single item emitted by the source Publisher, if that Publisher + * emits only a single item. + * If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown. + * If the source Publisher emits more than one item, throw an {@code IllegalArgumentException}. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an + * unbounded manner (i.e., without applying backpressure).
+ *
Scheduler:
+ *
{@code singleOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Single instance + * @see ReactiveX operators documentation: First + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Single singleOrError() { + return RxJavaPlugins.onAssembly(new FlowableSingleSingle(this, null)); + } + /** * Returns a Flowable that skips the first {@code count} items emitted by the source Publisher and emits * the remainder. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 33c2defff8..a9cf4f2c97 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6657,6 +6657,32 @@ public final Single elementAt(long index, T defaultItem) { return RxJavaPlugins.onAssembly(new ObservableElementAtSingle(this, index, defaultItem)); } + /** + * Returns a Single that emits the item found at a specified index in a sequence of emissions from a source ObservableSource. + * If the source ObservableSource does not contain the item at the specified index a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Scheduler:
+ *
{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param index + * the zero-based index of the item to retrieve + * @return a Single that emits the item at the specified position in the sequence emitted by the source + * ObservableSource, or the default item if that index is outside the bounds of the source sequence + * @throws IndexOutOfBoundsException + * if {@code index} is less than 0 + * @see ReactiveX operators documentation: ElementAt + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Single elementAtOrError(long index) { + if (index < 0) { + throw new IndexOutOfBoundsException("index >= 0 required but it was " + index); + } + return RxJavaPlugins.onAssembly(new ObservableElementAtSingle(this, index, null)); + } + /** * Filters items emitted by an ObservableSource by only emitting those that satisfy a specified predicate. *

@@ -6698,8 +6724,8 @@ public final Maybe firstElement() { } /** - * Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default - * item if the source ObservableSource completes without emitting anything. + * Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default item + * if the source ObservableSource completes without emitting any items. *

* *

@@ -6717,6 +6743,24 @@ public final Single first(T defaultItem) { return elementAt(0L, defaultItem); } + /** + * Returns a Single that emits only the very first item emitted by the source ObservableSource. + * If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Scheduler:
+ *
{@code firstOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Single instance + * @see ReactiveX operators documentation: First + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Single firstOrError() { + return elementAtOrError(0L); + } + /** * Returns an Observable that emits items based on applying a function that you supply to each item emitted * by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting @@ -7660,7 +7704,7 @@ public final Maybe lastElement() { } /** - * Returns an Observable that emits only the last item emitted by the source ObservableSource, or a default item + * Returns a Single that emits only the last item emitted by the source ObservableSource, or a default item * if the source ObservableSource completes without emitting any items. *

* @@ -7681,6 +7725,25 @@ public final Single last(T defaultItem) { return RxJavaPlugins.onAssembly(new ObservableLastSingle(this, defaultItem)); } + /** + * Returns a Single that emits only the last item emitted by the source ObservableSource. + * If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown. + *

+ * + *

+ *
Scheduler:
+ *
{@code lastOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a Single that emits only the last item emitted by the source ObservableSource. + * If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown. + * @see ReactiveX operators documentation: Last + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Single lastOrError() { + return RxJavaPlugins.onAssembly(new ObservableLastSingle(this, null)); + } + /** * This method requires advanced knowledge about building operators; please consider * other standard composition methods first; @@ -9333,7 +9396,7 @@ public final Maybe singleElement() { } /** - * Returns an Observable that emits the single item emitted by the source ObservableSource, if that ObservableSource + * Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource * emits only a single item, or a default item if the source ObservableSource emits no items. If the source * ObservableSource emits more than one item, throw an {@code IllegalArgumentException}. *

@@ -9345,8 +9408,7 @@ public final Maybe singleElement() { * * @param defaultItem * a default value to emit if the source ObservableSource emits no item - * @return an Observable that emits the single item emitted by the source ObservableSource, or a default item if - * the source ObservableSource is empty + * @return the new Single instance * @throws IllegalArgumentException * if the source ObservableSource emits more than one item * @see ReactiveX operators documentation: First @@ -9357,6 +9419,30 @@ public final Single single(T defaultItem) { return RxJavaPlugins.onAssembly(new ObservableSingleSingle(this, defaultItem)); } + /** + * Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource + * emits only a single item. + * If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown. + * If the source ObservableSource emits more than one item, throw an {@code IllegalArgumentException}. + *

+ * + *

+ *
Scheduler:
+ *
{@code singleOrError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Single instance + * @throws IllegalArgumentException + * if the source ObservableSource emits more than one item + * @throws NoSuchElementException + * if the source ObservableSource completes without emitting any items + * @see ReactiveX operators documentation: First + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Single singleOrError() { + return RxJavaPlugins.onAssembly(new ObservableSingleSingle(this, null)); + } + /** * Returns an Observable that skips the first {@code count} items emitted by the source ObservableSource and emits * the remainder. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java index 4f259f28bd..4816d67a82 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import java.util.NoSuchElementException; import org.reactivestreams.*; import io.reactivex.*; @@ -104,7 +105,14 @@ public void onComplete() { s = SubscriptionHelper.CANCELLED; if (index <= count && !done) { done = true; - actual.onSuccess(defaultValue); + + T v = defaultValue; + + if (v != null) { + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java index 1a8c34978d..e5598e8b58 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import java.util.NoSuchElementException; import org.reactivestreams.*; import io.reactivex.*; @@ -100,7 +101,13 @@ public void onComplete() { item = null; actual.onSuccess(v); } else { - actual.onSuccess(defaultItem); + v = defaultItem; + + if (v != null) { + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java index 6a61e17ae0..1f114b04ae 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import java.util.NoSuchElementException; import org.reactivestreams.*; import io.reactivex.*; @@ -107,7 +108,12 @@ public void onComplete() { if (v == null) { v = defaultValue; } - actual.onSuccess(v); + + if (v != null) { + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); + } } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java index cf7770cc1c..dcc61de06a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java @@ -16,17 +16,20 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import java.util.NoSuchElementException; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableElementAtSingle extends Single { final ObservableSource source; final long index; final T defaultValue; + public ObservableElementAtSingle(ObservableSource source, long index, T defaultValue) { this.source = source; this.index = index; this.defaultValue = defaultValue; } + @Override public void subscribeActual(SingleObserver t) { source.subscribe(new ElementAtObserver(t, index, defaultValue)); @@ -98,7 +101,14 @@ public void onError(Throwable t) { public void onComplete() { if (index <= count && !done) { done = true; - actual.onSuccess(defaultValue); + + T v = defaultValue; + + if (v != null) { + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableLastSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableLastSingle.java index 499e3d5b82..5b91211f19 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableLastSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableLastSingle.java @@ -99,10 +99,10 @@ public void onComplete() { actual.onSuccess(v); } else { v = defaultItem; - if (v == null) { - actual.onError(new NoSuchElementException()); - } else { + if (v != null) { actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java index c05b8354d6..e4704d333a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java @@ -17,6 +17,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; +import java.util.NoSuchElementException; public final class ObservableSingleSingle extends Single { @@ -28,6 +29,7 @@ public ObservableSingleSingle(ObservableSource source, T defaultValue) { this.source = source; this.defaultValue = defaultValue; } + @Override public void subscribeActual(SingleObserver t) { source.subscribe(new SingleElementObserver(t, defaultValue)); @@ -104,7 +106,12 @@ public void onComplete() { if (v == null) { v = defaultValue; } - actual.onSuccess(v); + + if (v != null) { + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException()); + } } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index 885ae65147..aa9b72465d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.*; +import java.util.NoSuchElementException; import org.junit.Test; import io.reactivex.Flowable; @@ -82,4 +83,56 @@ public void testElementAtOrDefaultWithIndexOutOfBounds() { public void testElementAtOrDefaultWithMinusIndex() { Flowable.fromArray(1, 2).elementAt(-1, 0); } -} \ No newline at end of file + + @Test(expected = IndexOutOfBoundsException.class) + public void elementAtOrErrorNegativeIndex() { + Flowable.empty() + .elementAtOrError(-1); + } + + @Test + public void elementAtOrErrorNoElement() { + Flowable.empty() + .elementAtOrError(0) + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void elementAtOrErrorOneElement() { + Flowable.just(1) + .elementAtOrError(0) + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void elementAtOrErrorMultipleElements() { + Flowable.just(1, 2, 3) + .elementAtOrError(1) + .test() + .assertNoErrors() + .assertValue(2); + } + + @Test + public void elementAtOrErrorInvalidIndex() { + Flowable.just(1, 2, 3) + .elementAtOrError(3) + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void elementAtOrErrorError() { + Flowable.error(new RuntimeException("error")) + .elementAtOrError(0) + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java index 9f41a00f20..5ffe61fc21 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java @@ -20,6 +20,8 @@ import org.mockito.InOrder; import org.reactivestreams.Subscriber; +import java.util.NoSuchElementException; + import io.reactivex.*; import io.reactivex.functions.Predicate; @@ -506,4 +508,41 @@ public boolean test(Integer t1) { inOrder.verify(wo, times(1)).onSuccess(2); inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void firstOrErrorNoElement() { + Flowable.empty() + .firstOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void firstOrErrorOneElement() { + Flowable.just(1) + .firstOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorMultipleElements() { + Flowable.just(1, 2, 3) + .firstOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorError() { + Flowable.error(new RuntimeException("error")) + .firstOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java index a990427481..449a4a23b2 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java @@ -17,6 +17,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import java.util.NoSuchElementException; + import org.junit.Test; import org.mockito.InOrder; @@ -256,4 +258,41 @@ public boolean test(Integer t1) { // inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void lastOrErrorNoElement() { + Flowable.empty() + .lastOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void lastOrErrorOneElement() { + Flowable.just(1) + .lastOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void lastOrErrorMultipleElements() { + Flowable.just(1, 2, 3) + .lastOrError() + .test() + .assertNoErrors() + .assertValue(3); + } + + @Test + public void lastOrErrorError() { + Flowable.error(new RuntimeException("error")) + .lastOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java index dd0cabe869..51b013718e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.NoSuchElementException; import org.junit.Test; import org.mockito.InOrder; @@ -647,4 +648,41 @@ public Integer apply(Integer i1, Integer i2) { Integer r = reduced.blockingGet(); assertEquals(21, r.intValue()); } -} \ No newline at end of file + + @Test + public void singleOrErrorNoElement() { + Flowable.empty() + .singleOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void singleOrErrorOneElement() { + Flowable.just(1) + .singleOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void singleOrErrorMultipleElements() { + Flowable.just(1, 2, 3) + .singleOrError() + .test() + .assertNoValues() + .assertError(IllegalArgumentException.class); + } + + @Test + public void singleOrErrorError() { + Flowable.error(new RuntimeException("error")) + .singleOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java index 57ea0edd1a..c37d48b31d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java @@ -17,6 +17,8 @@ import org.junit.Test; +import java.util.NoSuchElementException; + import io.reactivex.Observable; public class ObservableElementAtTest { @@ -72,4 +74,56 @@ public void testElementAtOrDefaultWithIndexOutOfBounds() { public void testElementAtOrDefaultWithMinusIndex() { Observable.fromArray(1, 2).elementAt(-1, 0); } -} \ No newline at end of file + + @Test(expected = IndexOutOfBoundsException.class) + public void elementAtOrErrorNegativeIndex() { + Observable.empty() + .elementAtOrError(-1); + } + + @Test + public void elementAtOrErrorNoElement() { + Observable.empty() + .elementAtOrError(0) + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void elementAtOrErrorOneElement() { + Observable.just(1) + .elementAtOrError(0) + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void elementAtOrErrorMultipleElements() { + Observable.just(1, 2, 3) + .elementAtOrError(1) + .test() + .assertNoErrors() + .assertValue(2); + } + + @Test + public void elementAtOrErrorInvalidIndex() { + Observable.just(1, 2, 3) + .elementAtOrError(3) + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void elementAtOrErrorError() { + Observable.error(new RuntimeException("error")) + .elementAtOrError(0) + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java index 40bd9941fc..a32268a55c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java @@ -19,6 +19,8 @@ import org.junit.*; import org.mockito.InOrder; +import java.util.NoSuchElementException; + import io.reactivex.*; import io.reactivex.functions.Predicate; @@ -503,4 +505,41 @@ public boolean test(Integer t1) { inOrder.verify(wo, times(1)).onSuccess(2); inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void firstOrErrorNoElement() { + Observable.empty() + .firstOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void firstOrErrorOneElement() { + Observable.just(1) + .firstOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorMultipleElements() { + Observable.just(1, 2, 3) + .firstOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorError() { + Observable.error(new RuntimeException("error")) + .firstOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java index e284799ded..245d4f7d27 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java @@ -20,6 +20,8 @@ import org.junit.Test; import org.mockito.InOrder; +import java.util.NoSuchElementException; + import io.reactivex.*; import io.reactivex.functions.Predicate; @@ -256,4 +258,41 @@ public boolean test(Integer t1) { // inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); } -} \ No newline at end of file + + @Test + public void lastOrErrorNoElement() { + Observable.empty() + .lastOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void lastOrErrorOneElement() { + Observable.just(1) + .lastOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void lastOrErrorMultipleElements() { + Observable.just(1, 2, 3) + .lastOrError() + .test() + .assertNoErrors() + .assertValue(3); + } + + @Test + public void lastOrErrorError() { + Observable.error(new RuntimeException("error")) + .lastOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java index ef4d561868..5660ba8973 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import org.mockito.InOrder; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; @@ -484,4 +485,41 @@ public void singleElementOperatorDoNotSwallowExceptionWhenDone() { RxJavaPlugins.reset(); } } + + @Test + public void singleOrErrorNoElement() { + Observable.empty() + .singleOrError() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void singleOrErrorOneElement() { + Observable.just(1) + .singleOrError() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void singleOrErrorMultipleElements() { + Observable.just(1, 2, 3) + .singleOrError() + .test() + .assertNoValues() + .assertError(IllegalArgumentException.class); + } + + @Test + public void singleOrErrorError() { + Observable.error(new RuntimeException("error")) + .singleOrError() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } }