diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 2ea8137592..b4ebd30747 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -49,7 +49,13 @@ protected void subscribeActual(Subscriber s) { if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) { return; } - source.subscribe(new MergeSubscriber(s, mapper, delayErrors, maxConcurrency, bufferSize)); + source.subscribe(subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize)); + } + + public static Subscriber subscribe(Subscriber s, + Function> mapper, + boolean delayErrors, int maxConcurrency, int bufferSize) { + return new MergeSubscriber(s, mapper, delayErrors, maxConcurrency, bufferSize); } static final class MergeSubscriber extends AtomicInteger implements Subscription, Subscriber { diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java index 7635fb57cd..1593d76690 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java @@ -19,6 +19,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiConsumer; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscribers.DeferredScalarSubscriber; import io.reactivex.internal.subscriptions.*; import io.reactivex.parallel.ParallelFlowable; @@ -34,12 +35,12 @@ public final class ParallelCollect extends ParallelFlowable { final ParallelFlowable source; - final Callable initialCollection; + final Callable initialCollection; - final BiConsumer collector; + final BiConsumer collector; public ParallelCollect(ParallelFlowable source, - Callable initialCollection, BiConsumer collector) { + Callable initialCollection, BiConsumer collector) { this.source = source; this.initialCollection = initialCollection; this.collector = collector; @@ -60,18 +61,13 @@ public void subscribe(Subscriber[] subscribers) { C initialValue; try { - initialValue = initialCollection.call(); + initialValue = ObjectHelper.requireNonNull(initialCollection.call(), "The initialSupplier returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); reportError(subscribers, ex); return; } - if (initialValue == null) { - reportError(subscribers, new NullPointerException("The initialSupplier returned a null value")); - return; - } - parents[i] = new ParallelCollectSubscriber(subscribers[i], initialValue, collector); } @@ -94,14 +90,14 @@ static final class ParallelCollectSubscriber extends DeferredScalarSubscri private static final long serialVersionUID = -4767392946044436228L; - final BiConsumer collector; + final BiConsumer collector; C collection; boolean done; ParallelCollectSubscriber(Subscriber subscriber, - C initialValue, BiConsumer collector) { + C initialValue, BiConsumer collector) { super(subscriber); this.collection = initialValue; this.collector = collector; diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java index 4f552ff3ab..7b75bfd81a 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java @@ -63,24 +63,8 @@ public void subscribe(Subscriber[] subscribers) { @SuppressWarnings("unchecked") final Subscriber[] parents = new Subscriber[n]; - // FIXME cheat until we have support from RxJava2 internals - Publisher p = new Publisher() { - int i; - - @SuppressWarnings("unchecked") - @Override - public void subscribe(Subscriber s) { - parents[i++] = (Subscriber)s; - } - }; - - FlowableConcatMap op = new FlowableConcatMap(p, mapper, prefetch, errorMode); - for (int i = 0; i < n; i++) { - - op.subscribe(subscribers[i]); -// FIXME needs a FlatMap subscriber -// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode); + parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode); } source.subscribe(parents); diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java index 6393deaeba..78b945417c 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java @@ -17,6 +17,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Predicate; +import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.plugins.RxJavaPlugins; @@ -48,7 +49,12 @@ public void subscribe(Subscriber[] subscribers) { Subscriber[] parents = new Subscriber[n]; for (int i = 0; i < n; i++) { - parents[i] = new ParallelFilterSubscriber(subscribers[i], predicate); + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelFilterConditionalSubscriber((ConditionalSubscriber)a, predicate); + } else { + parents[i] = new ParallelFilterSubscriber(a, predicate); + } } source.subscribe(parents); @@ -59,31 +65,44 @@ public int parallelism() { return source.parallelism(); } - static final class ParallelFilterSubscriber implements Subscriber, Subscription { - - final Subscriber actual; - + abstract static class BaseFilterSubscriber implements ConditionalSubscriber, Subscription { final Predicate predicate; Subscription s; boolean done; - ParallelFilterSubscriber(Subscriber actual, Predicate predicate) { - this.actual = actual; + BaseFilterSubscriber(Predicate predicate) { this.predicate = predicate; } @Override - public void request(long n) { + public final void request(long n) { s.request(n); } @Override - public void cancel() { + public final void cancel() { s.cancel(); } + @Override + public final void onNext(T t) { + if (!tryOnNext(t)) { + s.request(1); + } + } + } + + static final class ParallelFilterSubscriber extends BaseFilterSubscriber { + + final Subscriber actual; + + ParallelFilterSubscriber(Subscriber actual, Predicate predicate) { + super(predicate); + this.actual = actual; + } + @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { @@ -94,26 +113,83 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(T t) { + public boolean tryOnNext(T t) { + if (!done) { + boolean b; + + try { + b = predicate.test(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return false; + } + + if (b) { + actual.onNext(t); + return true; + } + } + return false; + } + + @Override + public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } - boolean b; - - try { - b = predicate.test(t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancel(); - onError(ex); - return; + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (!done) { + done = true; + actual.onComplete(); } + } + } - if (b) { - actual.onNext(t); - } else { - s.request(1); + static final class ParallelFilterConditionalSubscriber extends BaseFilterSubscriber { + + final ConditionalSubscriber actual; + + ParallelFilterConditionalSubscriber(ConditionalSubscriber actual, Predicate predicate) { + super(predicate); + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public boolean tryOnNext(T t) { + if (!done) { + boolean b; + + try { + b = predicate.test(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return false; + } + + if (b) { + return actual.tryOnNext(t); + } } + return false; } @Override @@ -128,12 +204,9 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; + if (!done) { + done = true; + actual.onComplete(); } - done = true; - actual.onComplete(); } - - } -} + }} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java index e11789fcab..a52aca0ea8 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java @@ -67,24 +67,8 @@ public void subscribe(Subscriber[] subscribers) { @SuppressWarnings("unchecked") final Subscriber[] parents = new Subscriber[n]; - // FIXME cheat until we have support from RxJava2 internals - Publisher p = new Publisher() { - int i; - - @SuppressWarnings("unchecked") - @Override - public void subscribe(Subscriber s) { - parents[i++] = (Subscriber)s; - } - }; - - FlowableFlatMap op = new FlowableFlatMap(p, mapper, delayError, maxConcurrency, prefetch); - for (int i = 0; i < n; i++) { - - op.subscribe(subscribers[i]); -// FIXME needs a FlatMap subscriber -// parents[i] = FlowableFlatMap.createSubscriber(s, mapper, delayError, maxConcurrency, prefetch); + parents[i] = FlowableFlatMap.subscribe(subscribers[i], mapper, delayError, maxConcurrency, prefetch); } source.subscribe(parents); diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java index 855bf4ac53..fb624f40e0 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java @@ -17,7 +17,7 @@ import org.reactivestreams.*; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -187,8 +187,8 @@ public void cancel() { public void onNext(T t) { if (sourceMode == QueueSubscription.NONE) { if (!queue.offer(t)) { - cancel(); - onError(new IllegalStateException("Queue is full?")); + s.cancel(); + onError(new MissingBackpressureException("Queue is full?")); return; } } @@ -344,18 +344,7 @@ void drainSync() { return; } - boolean empty; - - try { - empty = q.isEmpty(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - for (Subscriber s : a) { - s.onError(ex); - } - return; - } + boolean empty = q.isEmpty(); if (empty) { for (Subscriber s : a) { diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java index 29c5e6fd73..f94636e3c7 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java @@ -28,7 +28,7 @@ /** * Merges the individual 'rails' of the source ParallelFlowable, unordered, - * into a single regular Publisher sequence (exposed as Px). + * into a single regular Publisher sequence (exposed as Flowable). * * @param the value type */ @@ -123,7 +123,7 @@ void onNext(JoinInnerSubscriber inner, T value) { } inner.request(1); } else { - SimpleQueue q = inner.getQueue(); + SimplePlainQueue q = inner.getQueue(); if (!q.offer(value)) { cancelAll(); @@ -140,10 +140,13 @@ void onNext(JoinInnerSubscriber inner, T value) { return; } } else { - SimpleQueue q = inner.getQueue(); + SimplePlainQueue q = inner.getQueue(); - // FIXME overflow handling - q.offer(value); + if (!q.offer(value)) { + cancelAll(); + onError(new MissingBackpressureException("Queue full?!")); + return; + } if (getAndIncrement() != 0) { return; diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java index 4d35001c52..bfaabd484a 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java @@ -18,6 +18,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.plugins.RxJavaPlugins; @@ -50,7 +51,12 @@ public void subscribe(Subscriber[] subscribers) { Subscriber[] parents = new Subscriber[n]; for (int i = 0; i < n; i++) { - parents[i] = new ParallelMapSubscriber(subscribers[i], mapper); + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelMapConditionalSubscriber((ConditionalSubscriber)a, mapper); + } else { + parents[i] = new ParallelMapSubscriber(a, mapper); + } } source.subscribe(parents); @@ -134,4 +140,96 @@ public void onComplete() { } } + static final class ParallelMapConditionalSubscriber implements ConditionalSubscriber, Subscription { + + final ConditionalSubscriber actual; + + final Function mapper; + + Subscription s; + + boolean done; + + ParallelMapConditionalSubscriber(ConditionalSubscriber actual, Function mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + R v; + + try { + v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + + actual.onNext(v); + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + R v; + + try { + v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return false; + } + + return actual.tryOnNext(v); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } } diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java index c1e8b881c7..541e5b7345 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java @@ -52,14 +52,14 @@ public ParallelPeek(ParallelFlowable source, ) { this.source = source; - this.onNext = ObjectHelper.requireNonNull(onNext, "onNext"); - this.onAfterNext = ObjectHelper.requireNonNull(onAfterNext, "onAfterNext"); - this.onError = ObjectHelper.requireNonNull(onError, "onError"); - this.onComplete = ObjectHelper.requireNonNull(onComplete, "onComplete"); - this.onAfterTerminated = ObjectHelper.requireNonNull(onAfterTerminated, "onAfterTerminated"); - this.onSubscribe = ObjectHelper.requireNonNull(onSubscribe, "onSubscribe"); - this.onRequest = ObjectHelper.requireNonNull(onRequest, "onRequest"); - this.onCancel = ObjectHelper.requireNonNull(onCancel, "onCancel"); + this.onNext = ObjectHelper.requireNonNull(onNext, "onNext is null"); + this.onAfterNext = ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); + this.onError = ObjectHelper.requireNonNull(onError, "onError is null"); + this.onComplete = ObjectHelper.requireNonNull(onComplete, "onComplete is null"); + this.onAfterTerminated = ObjectHelper.requireNonNull(onAfterTerminated, "onAfterTerminated is null"); + this.onSubscribe = ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); + this.onRequest = ObjectHelper.requireNonNull(onRequest, "onRequest is null"); + this.onCancel = ObjectHelper.requireNonNull(onCancel, "onCancel is null"); } @Override @@ -142,26 +142,24 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (done) { - return; - } - - try { - parent.onNext.accept(t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - onError(ex); - return; - } + if (!done) { + try { + parent.onNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } - actual.onNext(t); + actual.onNext(t); - try { - parent.onAfterNext.accept(t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - onError(ex); - return; + try { + parent.onAfterNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } } } @@ -191,26 +189,24 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; - } - done = true; - try { - parent.onComplete.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - actual.onError(ex); - return; - } - actual.onComplete(); + if (!done) { + done = true; + try { + parent.onComplete.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + actual.onComplete(); - try { - parent.onAfterTerminated.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); + try { + parent.onAfterTerminated.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } } } - } } diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java index 3b3e32234e..f747e8c83c 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java @@ -19,6 +19,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscribers.DeferredScalarSubscriber; import io.reactivex.internal.subscriptions.*; import io.reactivex.parallel.ParallelFlowable; @@ -36,9 +37,9 @@ public final class ParallelReduce extends ParallelFlowable { final Callable initialSupplier; - final BiFunction reducer; + final BiFunction reducer; - public ParallelReduce(ParallelFlowable source, Callable initialSupplier, BiFunction reducer) { + public ParallelReduce(ParallelFlowable source, Callable initialSupplier, BiFunction reducer) { this.source = source; this.initialSupplier = initialSupplier; this.reducer = reducer; @@ -59,18 +60,13 @@ public void subscribe(Subscriber[] subscribers) { R initialValue; try { - initialValue = initialSupplier.call(); + initialValue = ObjectHelper.requireNonNull(initialSupplier.call(), "The initialSupplier returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); reportError(subscribers, ex); return; } - if (initialValue == null) { - reportError(subscribers, new NullPointerException("The initialSupplier returned a null value")); - return; - } - parents[i] = new ParallelReduceSubscriber(subscribers[i], initialValue, reducer); } @@ -93,13 +89,13 @@ static final class ParallelReduceSubscriber extends DeferredScalarSubscrib private static final long serialVersionUID = 8200530050639449080L; - final BiFunction reducer; + final BiFunction reducer; R accumulator; boolean done; - ParallelReduceSubscriber(Subscriber subscriber, R initialValue, BiFunction reducer) { + ParallelReduceSubscriber(Subscriber subscriber, R initialValue, BiFunction reducer) { super(subscriber); this.accumulator = initialValue; this.reducer = reducer; @@ -118,28 +114,20 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (done) { - return; - } - - R v; - - try { - v = reducer.apply(accumulator, t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancel(); - onError(ex); - return; - } - - if (v == null) { - cancel(); - onError(new NullPointerException("The reducer returned a null value")); - return; + if (!done) { + R v; + + try { + v = ObjectHelper.requireNonNull(reducer.apply(accumulator, t), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + + accumulator = v; } - - accumulator = v; } @Override @@ -155,14 +143,13 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; - } - done = true; + if (!done) { + done = true; - R a = accumulator; - accumulator = null; - complete(a); + R a = accumulator; + accumulator = null; + complete(a); + } } @Override diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java index 020b62d7be..1fe262630a 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java @@ -63,7 +63,7 @@ static final class ParallelReduceFullMainSubscriber extends DeferredScalarSub final AtomicInteger remaining = new AtomicInteger(); - final AtomicBoolean once = new AtomicBoolean(); + final AtomicReference error = new AtomicReference(); ParallelReduceFullMainSubscriber(Subscriber subscriber, int n, BiFunction reducer) { super(subscriber); @@ -115,11 +115,13 @@ public void cancel() { } void innerError(Throwable ex) { - if (once.compareAndSet(false, true)) { + if (error.compareAndSet(null, ex)) { cancel(); actual.onError(ex); } else { - RxJavaPlugins.onError(ex); + if (ex != error.get()) { + RxJavaPlugins.onError(ex); + } } } @@ -131,17 +133,13 @@ void innerComplete(T value) { if (sp != null) { try { - value = reducer.apply(sp.first, sp.second); + value = ObjectHelper.requireNonNull(reducer.apply(sp.first, sp.second), "The reducer returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); innerError(ex); return; } - if (value == null) { - innerError(new NullPointerException("The reducer returned a null value")); - return; - } } else { break; } @@ -189,25 +187,24 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (done) { - return; - } - T v = value; + if (!done) { + T v = value; - if (v == null) { - value = t; - } else { + if (v == null) { + value = t; + } else { - try { - v = ObjectHelper.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - get().cancel(); - onError(ex); - return; - } + try { + v = ObjectHelper.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + get().cancel(); + onError(ex); + return; + } - value = v; + value = v; + } } } @@ -223,11 +220,10 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; + if (!done) { + done = true; + parent.innerComplete(value); } - done = true; - parent.innerComplete(value); } void cancel() { @@ -235,38 +231,31 @@ void cancel() { } } - static final class SlotPair { + static final class SlotPair extends AtomicInteger { + + private static final long serialVersionUID = 473971317683868662L; T first; T second; - volatile int acquireIndex; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ACQ = - AtomicIntegerFieldUpdater.newUpdater(SlotPair.class, "acquireIndex"); - - - volatile int releaseIndex; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater REL = - AtomicIntegerFieldUpdater.newUpdater(SlotPair.class, "releaseIndex"); + final AtomicInteger releaseIndex = new AtomicInteger(); int tryAcquireSlot() { for (;;) { - int acquired = acquireIndex; + int acquired = get(); if (acquired >= 2) { return -1; } - if (ACQ.compareAndSet(this, acquired, acquired + 1)) { + if (compareAndSet(acquired, acquired + 1)) { return acquired; } } } boolean releaseSlot() { - return REL.incrementAndGet(this) == 2; + return releaseIndex.incrementAndGet() == 2; } } } diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java index 4582ac50e4..30a5398833 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java @@ -19,6 +19,8 @@ import io.reactivex.Scheduler; import io.reactivex.Scheduler.Worker; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -63,8 +65,11 @@ public void subscribe(Subscriber[] subscribers) { Worker w = scheduler.createWorker(); SpscArrayQueue q = new SpscArrayQueue(prefetch); - RunOnSubscriber parent = new RunOnSubscriber(a, prefetch, q, w); - parents[i] = parent; + if (a instanceof ConditionalSubscriber) { + parents[i] = new RunOnConditionalSubscriber((ConditionalSubscriber)a, prefetch, q, w); + } else { + parents[i] = new RunOnSubscriber(a, prefetch, q, w); + } } source.subscribe(parents); @@ -76,14 +81,10 @@ public int parallelism() { return source.parallelism(); } - static final class RunOnSubscriber - extends AtomicInteger - implements Subscriber, Subscription, Runnable { - + abstract static class BaseRunOnSubscriber extends AtomicInteger + implements Subscriber, Subscription, Runnable { - private static final long serialVersionUID = 1075119423897941642L; - - final Subscriber actual; + private static final long serialVersionUID = 9222303586456402150L; final int prefetch; @@ -105,8 +106,7 @@ static final class RunOnSubscriber int consumed; - RunOnSubscriber(Subscriber actual, int prefetch, SpscArrayQueue queue, Worker worker) { - this.actual = actual; + BaseRunOnSubscriber(int prefetch, SpscArrayQueue queue, Worker worker) { this.prefetch = prefetch; this.queue = queue; this.limit = prefetch - (prefetch >> 2); @@ -114,30 +114,20 @@ static final class RunOnSubscriber } @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - - actual.onSubscribe(this); - - s.request(prefetch); - } - } - - @Override - public void onNext(T t) { + public final void onNext(T t) { if (done) { return; } if (!queue.offer(t)) { - onError(new IllegalStateException("Queue is full?!")); + s.cancel(); + onError(new MissingBackpressureException("Queue is full?!")); return; } schedule(); } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; @@ -148,7 +138,7 @@ public void onError(Throwable t) { } @Override - public void onComplete() { + public final void onComplete() { if (done) { return; } @@ -157,7 +147,7 @@ public void onComplete() { } @Override - public void request(long n) { + public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); schedule(); @@ -165,7 +155,7 @@ public void request(long n) { } @Override - public void cancel() { + public final void cancel() { if (!cancelled) { cancelled = true; s.cancel(); @@ -177,11 +167,34 @@ public void cancel() { } } - void schedule() { + final void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } + } + + static final class RunOnSubscriber extends BaseRunOnSubscriber { + + private static final long serialVersionUID = 1075119423897941642L; + + final Subscriber actual; + + RunOnSubscriber(Subscriber actual, int prefetch, SpscArrayQueue queue, Worker worker) { + super(prefetch, queue, worker); + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(prefetch); + } + } @Override public void run() { @@ -284,4 +297,128 @@ public void run() { } } } + + static final class RunOnConditionalSubscriber extends BaseRunOnSubscriber { + + private static final long serialVersionUID = 1075119423897941642L; + + final ConditionalSubscriber actual; + + RunOnConditionalSubscriber(ConditionalSubscriber actual, int prefetch, SpscArrayQueue queue, Worker worker) { + super(prefetch, queue, worker); + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(prefetch); + } + } + + @Override + public void run() { + int missed = 1; + int c = consumed; + SpscArrayQueue q = queue; + ConditionalSubscriber a = actual; + int lim = limit; + + for (;;) { + + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + + if (d) { + Throwable ex = error; + if (ex != null) { + q.clear(); + + a.onError(ex); + + worker.dispose(); + return; + } + } + + T v = q.poll(); + + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + + worker.dispose(); + return; + } + + if (empty) { + break; + } + + if (a.tryOnNext(v)) { + e++; + } + + int p = ++c; + if (p == lim) { + c = 0; + s.request(p); + } + } + + if (e == r) { + if (cancelled) { + q.clear(); + return; + } + + if (done) { + Throwable ex = error; + if (ex != null) { + q.clear(); + + a.onError(ex); + + worker.dispose(); + return; + } + if (q.isEmpty()) { + a.onComplete(); + + worker.dispose(); + return; + } + } + } + + if (e != 0L && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + int w = get(); + if (w == missed) { + consumed = c; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java index e0b55be161..1cf5048fd4 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java @@ -19,6 +19,7 @@ import org.reactivestreams.*; import io.reactivex.Flowable; +import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; import io.reactivex.parallel.ParallelFlowable; @@ -73,7 +74,7 @@ static final class SortedJoinSubscription final AtomicInteger remaining = new AtomicInteger(); - final AtomicThrowable error = new AtomicThrowable(); + final AtomicReference error = new AtomicReference(); @SuppressWarnings("unchecked") SortedJoinSubscription(Subscriber actual, int n, Comparator comparator) { @@ -126,10 +127,12 @@ void innerNext(List value, int index) { } void innerError(Throwable e) { - if (error.addThrowable(e)) { + if (error.compareAndSet(null, e)) { drain(); } else { - RxJavaPlugins.onError(e); + if (e != error.get()) { + RxJavaPlugins.onError(e); + } } } @@ -159,7 +162,7 @@ void drain() { if (ex != null) { cancelAll(); Arrays.fill(lists, null); - a.onError(error.terminate()); + a.onError(ex); return; } @@ -176,7 +179,22 @@ void drain() { minIndex = i; } else { T b = list.get(index); - if (comparator.compare(min, b) > 0) { + + boolean smaller; + + try { + smaller = comparator.compare(min, b) > 0; + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancelAll(); + Arrays.fill(lists, null); + if (!error.compareAndSet(null, exc)) { + RxJavaPlugins.onError(exc); + } + a.onError(error.get()); + return; + } + if (smaller) { min = b; minIndex = i; } @@ -207,7 +225,7 @@ void drain() { if (ex != null) { cancelAll(); Arrays.fill(lists, null); - a.onError(error.terminate()); + a.onError(ex); return; } diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index f5ae690975..b3fba79201 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -67,8 +67,9 @@ public abstract class ParallelFlowable { protected final boolean validate(Subscriber[] subscribers) { int p = parallelism(); if (subscribers.length != p) { + Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length); for (Subscriber s : subscribers) { - EmptySubscription.error(new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length), s); + EmptySubscription.error(iae, s); } return false; } @@ -225,7 +226,7 @@ public final Flowable reduce(BiFunction reducer) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable reduce(Callable initialSupplier, BiFunction reducer) { + public final ParallelFlowable reduce(Callable initialSupplier, BiFunction reducer) { ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); ObjectHelper.requireNonNull(reducer, "reducer"); return new ParallelReduce(this, initialSupplier, reducer); @@ -513,7 +514,7 @@ public final ParallelFlowable doOnCancel(Action onCancel) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable collect(Callable collectionSupplier, BiConsumer collector) { + public final ParallelFlowable collect(Callable collectionSupplier, BiConsumer collector) { return new ParallelCollect(this, collectionSupplier, collector); } diff --git a/src/test/java/io/reactivex/parallel/ParallelCollectTest.java b/src/test/java/io/reactivex/parallel/ParallelCollectTest.java new file mode 100644 index 0000000000..41c76f37df --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelCollectTest.java @@ -0,0 +1,170 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelCollectTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .collect(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + })); + } + + @SuppressWarnings("unchecked") + @Test + public void initialCrash() { + Flowable.range(1, 5) + .parallel() + .collect(new Callable>() { + @Override + public List call() throws Exception { + throw new TestException(); + } + }, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void reducerCrash() { + Flowable.range(1, 5) + .parallel() + .collect(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + if (b == 3) { + throw new TestException(); + } + a.add(b); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber> ts = pp + .parallel() + .collect(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .sequential() + .test(); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .collect(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .collect(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiConsumer, Object>() { + @Override + public void accept(List a, Object b) throws Exception { + a.add(b); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelFilterTest.java b/src/test/java/io/reactivex/parallel/ParallelFilterTest.java new file mode 100644 index 0000000000..1d93d5bb4d --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelFilterTest.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class ParallelFilterTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .filter(Functions.alwaysTrue())); + } + + @Test + public void doubleFilter() { + Flowable.range(1, 10) + .parallel() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 3 == 0; + } + }) + .sequential() + .test() + .assertResult(6); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleError2() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .filter(Functions.alwaysTrue()) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void predicateThrows() { + Flowable.just(1) + .parallel() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index 7fe6266206..12b903fda2 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -26,6 +26,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.UnicastProcessor; import io.reactivex.schedulers.Schedulers; @@ -1287,4 +1288,68 @@ public void accept(Throwable e) throws Exception { assertEquals(1, count[0]); } + public static void checkSubscriberCount(ParallelFlowable source) { + int n = source.parallelism(); + + @SuppressWarnings("unchecked") + TestSubscriber[] consumers = new TestSubscriber[n + 1]; + + for (int i = 0; i <= n; i++) { + consumers[i] = new TestSubscriber(); + } + + source.subscribe(consumers); + + for (int i = 0; i <= n; i++) { + consumers[i].awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalArgumentException.class); + } + } + + @Test + public void checkAddBiConsumer() { + TestHelper.checkEnum(ListAddBiConsumer.class); + } + + @Test + public void mergeBiFunction() throws Exception { + MergerBiFunction f = new MergerBiFunction(Functions.naturalComparator()); + + assertEquals(0, f.apply(Collections.emptyList(), Collections.emptyList()).size()); + + assertEquals(Arrays.asList(1, 2), f.apply(Collections.emptyList(), Arrays.asList(1, 2))); + + for (int i = 0; i < 4; i++) { + int k = 0; + List list1 = new ArrayList(); + for (int j = 0; j < i; j++) { + list1.add(k++); + } + + List list2 = new ArrayList(); + for (int j = i; j < 4; j++) { + list2.add(k++); + } + + assertEquals(Arrays.asList(0, 1, 2, 3), f.apply(list1, list2)); + } + } + + @Test + public void concatMapSubscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .concatMap(Functions.justFunction(Flowable.just(1)))); + } + + @Test + public void flatMapSubscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .flatMap(Functions.justFunction(Flowable.just(1)))); + } + + @SuppressWarnings("unchecked") + @Test + public void fromArraySubscriberCount() { + ParallelFlowableTest.checkSubscriberCount(ParallelFlowable.fromArray(new Publisher[] { Flowable.just(1) })); + } } diff --git a/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java b/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java new file mode 100644 index 0000000000..874fd62fba --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.processors.UnicastProcessor; + +public class ParallelFromPublisherTest { + + @Test + public void sourceOverflow() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + } + .parallel(1, 1) + .sequential(1) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void fusedFilterBecomesEmpty() { + Flowable.just(1) + .filter(Functions.alwaysFalse()) + .parallel() + .sequential() + .test() + .assertResult(); + } + + @Test + public void syncFusedMapCrash() { + Flowable.just(1) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .parallel() + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void asyncFusedMapCrash() { + UnicastProcessor up = UnicastProcessor.create(); + + up.onNext(1); + + up + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .parallel() + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(up.hasSubscribers()); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelInvalid.java b/src/test/java/io/reactivex/parallel/ParallelInvalid.java new file mode 100644 index 0000000000..c62112ee96 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelInvalid.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import org.reactivestreams.Subscriber; + +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.EmptySubscription; + +/** + * Signals two onErrors to each subscriber for testing purposes. + */ +public final class ParallelInvalid extends ParallelFlowable { + + @Override + public void subscribe(Subscriber[] subscribers) { + TestException ex = new TestException(); + for (Subscriber s : subscribers) { + EmptySubscription.error(ex, s); + s.onError(ex); + s.onNext(0); + s.onComplete(); + s.onComplete(); + } + } + + @Override + public int parallelism() { + return 4; + } + +} diff --git a/src/test/java/io/reactivex/parallel/ParallelJoinTest.java b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java new file mode 100644 index 0000000000..9cff824b63 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelJoinTest { + + @Test + public void overflowFastpath() { + new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + subscribers[0].onSubscribe(new BooleanSubscription()); + subscribers[0].onNext(1); + subscribers[0].onNext(2); + subscribers[0].onNext(3); + } + + @Override + public int parallelism() { + return 1; + } + } + .sequential(1) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void overflowSlowpath() { + @SuppressWarnings("unchecked") + final Subscriber[] subs = new Subscriber[1]; + + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + subs[0].onNext(2); + subs[0].onNext(3); + } + }; + + new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + subs[0] = subscribers[0]; + subscribers[0].onSubscribe(new BooleanSubscription()); + subscribers[0].onNext(1); + } + + @Override + public int parallelism() { + return 1; + } + } + .sequential(1) + .subscribe(ts); + + ts.assertFailure(MissingBackpressureException.class, 1); + } + + @Test + public void emptyBackpressured() { + Flowable.empty() + .parallel() + .sequential() + .test(0) + .assertResult(); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelMapTest.java b/src/test/java/io/reactivex/parallel/ParallelMapTest.java new file mode 100644 index 0000000000..1e7c158112 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelMapTest.java @@ -0,0 +1,181 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class ParallelMapTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .map(Functions.identity())); + } + + @Test + public void doubleFilter() { + Flowable.range(1, 10) + .parallel() + .map(Functions.identity()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 3 == 0; + } + }) + .sequential() + .test() + .assertResult(6); + } + + @Test + public void doubleFilterAsync() { + Flowable.range(1, 10) + .parallel() + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 3 == 0; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(6); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .map(Functions.identity()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleError2() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .map(Functions.identity()) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .map(Functions.identity()) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrash() { + Flowable.just(1) + .parallel() + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrashConditional() { + Flowable.just(1) + .parallel() + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapCrashConditional2() { + Flowable.just(1) + .parallel() + .runOn(Schedulers.computation()) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelPeekTest.java b/src/test/java/io/reactivex/parallel/ParallelPeekTest.java new file mode 100644 index 0000000000..0ac327fb48 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelPeekTest.java @@ -0,0 +1,194 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class ParallelPeekTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .doOnNext(Functions.emptyConsumer())); + } + + @Test + public void onSubscribeCrash() { + Flowable.range(1, 5) + .parallel() + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .doOnNext(Functions.emptyConsumer()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void requestCrash() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.range(1, 5) + .parallel() + .doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + + assertFalse(errors.isEmpty()); + + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void cancelCrash() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.never() + .parallel() + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .cancel(); + + assertFalse(errors.isEmpty()); + + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void onCompleteCrash() { + Flowable.just(1) + .parallel() + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void onAfterTerminatedCrash() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.just(1) + .parallel() + .doAfterTerminated(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertResult(1); + + assertFalse(errors.isEmpty()); + + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void onAfterTerminatedCrash2() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.error(new IOException()) + .parallel() + .doAfterTerminated(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(IOException.class); + + assertFalse(errors.isEmpty()); + + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelReduceFullTest.java b/src/test/java/io/reactivex/parallel/ParallelReduceFullTest.java new file mode 100644 index 0000000000..e083b1394e --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelReduceFullTest.java @@ -0,0 +1,167 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.BiFunction; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelReduceFullTest { + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .parallel() + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void error() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.error(new TestException()) + .parallel() + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertFailure(TestException.class); + + assertTrue(errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void error2() { + List errors = TestHelper.trackPluginErrors(); + + try { + ParallelFlowable.fromArray(Flowable.error(new IOException()), Flowable.error(new TestException())) + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void empty() { + Flowable.empty() + .parallel() + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertResult(); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .reduce(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return "" + a + b; + } + }) + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void reducerCrash() { + Flowable.range(1, 4) + .parallel(2) + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + if (b == 3) { + throw new TestException(); + } + return a + b; + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void reducerCrash2() { + Flowable.range(1, 4) + .parallel(2) + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + if (a == 1 + 3) { + throw new TestException(); + } + return a + b; + } + }) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelReduceTest.java b/src/test/java/io/reactivex/parallel/ParallelReduceTest.java new file mode 100644 index 0000000000..a7f3905976 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelReduceTest.java @@ -0,0 +1,175 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; +import java.util.*; +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelReduceTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiFunction, Integer, List>() { + @Override + public List apply(List a, Integer b) throws Exception { + a.add(b); + return a; + } + })); + } + + @SuppressWarnings("unchecked") + @Test + public void initialCrash() { + Flowable.range(1, 5) + .parallel() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + throw new TestException(); + } + }, new BiFunction, Integer, List>() { + @Override + public List apply(List a, Integer b) throws Exception { + a.add(b); + return a; + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void reducerCrash() { + Flowable.range(1, 5) + .parallel() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiFunction, Integer, List>() { + @Override + public List apply(List a, Integer b) throws Exception { + if (b == 3) { + throw new TestException(); + } + a.add(b); + return a; + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber> ts = pp + .parallel() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiFunction, Integer, List>() { + @Override + public List apply(List a, Integer b) throws Exception { + a.add(b); + return a; + } + }) + .sequential() + .test(); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Flowable.error(new TestException()) + .parallel() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiFunction, Integer, List>() { + @Override + public List apply(List a, Integer b) throws Exception { + a.add(b); + return a; + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .reduce(new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }, new BiFunction, Object, List>() { + @Override + public List apply(List a, Object b) throws Exception { + a.add(b); + return a; + } + }) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java b/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java new file mode 100644 index 0000000000..32a821f4a4 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java @@ -0,0 +1,281 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelRunOnTest { + + @Test + public void subscriberCount() { + ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel() + .runOn(Schedulers.computation())); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .runOn(ImmediateThinScheduler.INSTANCE) + .sequential() + .test() + .assertFailure(TestException.class); + + assertFalse(errors.isEmpty()); + for (Throwable ex : errors) { + assertTrue(ex.toString(), ex instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void conditionalPath() { + Flowable.range(1, 1000) + .parallel(2) + .runOn(Schedulers.computation()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(500) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void missingBackpressure() { + new ParallelFlowable() { + @Override + public int parallelism() { + return 1; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + subscribers[0].onSubscribe(new BooleanSubscription()); + subscribers[0].onNext(1); + subscribers[0].onNext(2); + subscribers[0].onNext(3); + } + } + .runOn(ImmediateThinScheduler.INSTANCE, 1) + .sequential(1) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorBackpressured() { + Flowable.error(new TestException()) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .sequential(1) + .test(0) + .assertFailure(TestException.class); + } + + @Test + public void errorConditional() { + Flowable.error(new TestException()) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void errorConditionalBackpressured() { + TestSubscriber ts = new TestSubscriber(0L); + + Flowable.error(new TestException()) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .subscribe(new Subscriber[] { ts }); + + ts + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void emptyConditionalBackpressured() { + TestSubscriber ts = new TestSubscriber(0L); + + Flowable.empty() + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .subscribe(new Subscriber[] { ts }); + + ts + .assertResult(); + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.parallel(1) + .runOn(Schedulers.computation()) + .sequential() + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @SuppressWarnings("unchecked") + @Test + public void nextCancelRaceBackpressured() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = TestSubscriber.create(0L); + + pp.parallel(1) + .runOn(Schedulers.computation()) + .subscribe(new Subscriber[] { ts }); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void nextCancelRaceConditional() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.parallel(1) + .runOn(Schedulers.computation()) + .filter(Functions.alwaysTrue()) + .sequential() + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @SuppressWarnings("unchecked") + @Test + public void nextCancelRaceBackpressuredConditional() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = TestSubscriber.create(0L); + + pp.parallel(1) + .runOn(Schedulers.computation()) + .filter(Functions.alwaysTrue()) + .subscribe(new Subscriber[] { ts }); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelSortedJoinTest.java b/src/test/java/io/reactivex/parallel/ParallelSortedJoinTest.java new file mode 100644 index 0000000000..f1b783e2eb --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelSortedJoinTest.java @@ -0,0 +1,210 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.*; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelSortedJoinTest { + + @Test + public void cancel() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .parallel() + .sorted(Functions.naturalComparator()) + .test(); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void error() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.error(new TestException()) + .parallel() + .sorted(Functions.naturalComparator()) + .test() + .assertFailure(TestException.class); + + assertTrue(errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void error3() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable.error(new TestException()) + .parallel() + .sorted(Functions.naturalComparator()) + .test(0) + .assertFailure(TestException.class); + + assertTrue(errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void error2() { + List errors = TestHelper.trackPluginErrors(); + + try { + ParallelFlowable.fromArray(Flowable.error(new IOException()), Flowable.error(new TestException())) + .sorted(Functions.naturalComparator()) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void comparerCrash() { + Flowable.fromArray(4, 3, 2, 1) + .parallel(2) + .sorted(new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + if (o1 == 4 && o2 == 3) { + throw new TestException(); + } + return o1.compareTo(o2); + } + }) + .test() + .assertFailure(TestException.class, 1, 2); + } + + @Test + public void empty() { + Flowable.empty() + .parallel() + .sorted(Functions.naturalComparator()) + .test() + .assertResult(); + } + + @Test + public void asyncDrain() { + Integer[] values = new Integer[100 * 1000]; + for (int i = 0; i < values.length; i++) { + values[i] = values.length - i; + } + + TestSubscriber ts = Flowable.fromArray(values) + .parallel(2) + .runOn(Schedulers.computation()) + .sorted(Functions.naturalComparator()) + .observeOn(Schedulers.single()) + .test(); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(values.length) + .assertNoErrors() + .assertComplete(); + + List list = ts.values(); + for (int i = 0; i < values.length; i++) { + assertEquals(i + 1, list.get(i).intValue()); + } + } + + @Test + public void sortCancelRace() { + for (int i = 0; i < 1000; i++) { + final ReplayProcessor pp = ReplayProcessor.create(); + pp.onNext(1); + pp.onNext(2); + + final TestSubscriber ts = pp.parallel(2) + .sorted(Functions.naturalComparator()) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void sortCancelRace2() { + for (int i = 0; i < 1000; i++) { + final ReplayProcessor pp = ReplayProcessor.create(); + pp.onNext(1); + pp.onNext(2); + + final TestSubscriber ts = pp.parallel(2) + .sorted(Functions.naturalComparator()) + .test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } +}