Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: improve the parallel() mode test coverage, improve its code #5006

Merged
merged 1 commit into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ protected void subscribeActual(Subscriber<? super U> s) {
if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}
source.subscribe(new MergeSubscriber<T, U>(s, mapper, delayErrors, maxConcurrency, bufferSize));
source.subscribe(subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize));
}

public static <T, U> Subscriber<T> subscribe(Subscriber<? super U> s,
Function<? super T, ? extends Publisher<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
return new MergeSubscriber<T, U>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}

static final class MergeSubscriber<T, U> extends AtomicInteger implements Subscription, Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscribers.DeferredScalarSubscriber;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.parallel.ParallelFlowable;
Expand All @@ -34,12 +35,12 @@ public final class ParallelCollect<T, C> extends ParallelFlowable<C> {

final ParallelFlowable<? extends T> source;

final Callable<C> initialCollection;
final Callable<? extends C> initialCollection;

final BiConsumer<C, T> collector;
final BiConsumer<? super C, ? super T> collector;

public ParallelCollect(ParallelFlowable<? extends T> source,
Callable<C> initialCollection, BiConsumer<C, T> collector) {
Callable<? extends C> initialCollection, BiConsumer<? super C, ? super T> collector) {
this.source = source;
this.initialCollection = initialCollection;
this.collector = collector;
Expand All @@ -60,18 +61,13 @@ public void subscribe(Subscriber<? super C>[] subscribers) {
C initialValue;

try {
initialValue = initialCollection.call();
initialValue = ObjectHelper.requireNonNull(initialCollection.call(), "The initialSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
reportError(subscribers, ex);
return;
}

if (initialValue == null) {
reportError(subscribers, new NullPointerException("The initialSupplier returned a null value"));
return;
}

parents[i] = new ParallelCollectSubscriber<T, C>(subscribers[i], initialValue, collector);
}

Expand All @@ -94,14 +90,14 @@ static final class ParallelCollectSubscriber<T, C> extends DeferredScalarSubscri

private static final long serialVersionUID = -4767392946044436228L;

final BiConsumer<C, T> collector;
final BiConsumer<? super C, ? super T> collector;

C collection;

boolean done;

ParallelCollectSubscriber(Subscriber<? super C> subscriber,
C initialValue, BiConsumer<C, T> collector) {
C initialValue, BiConsumer<? super C, ? super T> collector) {
super(subscriber);
this.collection = initialValue;
this.collector = collector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];

// FIXME cheat until we have support from RxJava2 internals
Publisher<T> p = new Publisher<T>() {
int i;

@SuppressWarnings("unchecked")
@Override
public void subscribe(Subscriber<? super T> s) {
parents[i++] = (Subscriber<T>)s;
}
};

FlowableConcatMap<T, R> op = new FlowableConcatMap<T, R>(p, mapper, prefetch, errorMode);

for (int i = 0; i < n; i++) {

op.subscribe(subscribers[i]);
// FIXME needs a FlatMap subscriber
// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode);
parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode);
}

source.subscribe(parents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -48,7 +49,12 @@ public void subscribe(Subscriber<? super T>[] subscribers) {
Subscriber<? super T>[] parents = new Subscriber[n];

for (int i = 0; i < n; i++) {
parents[i] = new ParallelFilterSubscriber<T>(subscribers[i], predicate);
Subscriber<? super T> a = subscribers[i];
if (a instanceof ConditionalSubscriber) {
parents[i] = new ParallelFilterConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, predicate);
} else {
parents[i] = new ParallelFilterSubscriber<T>(a, predicate);
}
}

source.subscribe(parents);
Expand All @@ -59,31 +65,44 @@ public int parallelism() {
return source.parallelism();
}

static final class ParallelFilterSubscriber<T> implements Subscriber<T>, Subscription {

final Subscriber<? super T> actual;

abstract static class BaseFilterSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
final Predicate<? super T> predicate;

Subscription s;

boolean done;

ParallelFilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
this.actual = actual;
BaseFilterSubscriber(Predicate<? super T> predicate) {
this.predicate = predicate;
}

@Override
public void request(long n) {
public final void request(long n) {
s.request(n);
}

@Override
public void cancel() {
public final void cancel() {
s.cancel();
}

@Override
public final void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}
}

static final class ParallelFilterSubscriber<T> extends BaseFilterSubscriber<T> {

final Subscriber<? super T> actual;

ParallelFilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
super(predicate);
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
Expand All @@ -94,26 +113,83 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(T t) {
public boolean tryOnNext(T t) {
if (!done) {
boolean b;

try {
b = predicate.test(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return false;
}

if (b) {
actual.onNext(t);
return true;
}
}
return false;
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
boolean b;

try {
b = predicate.test(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return;
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (!done) {
done = true;
actual.onComplete();
}
}
}

if (b) {
actual.onNext(t);
} else {
s.request(1);
static final class ParallelFilterConditionalSubscriber<T> extends BaseFilterSubscriber<T> {

final ConditionalSubscriber<? super T> actual;

ParallelFilterConditionalSubscriber(ConditionalSubscriber<? super T> actual, Predicate<? super T> predicate) {
super(predicate);
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);
}
}

@Override
public boolean tryOnNext(T t) {
if (!done) {
boolean b;

try {
b = predicate.test(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return false;
}

if (b) {
return actual.tryOnNext(t);
}
}
return false;
}

@Override
Expand All @@ -128,12 +204,9 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
return;
if (!done) {
done = true;
actual.onComplete();
}
done = true;
actual.onComplete();
}

}
}
}}
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];

// FIXME cheat until we have support from RxJava2 internals
Publisher<T> p = new Publisher<T>() {
int i;

@SuppressWarnings("unchecked")
@Override
public void subscribe(Subscriber<? super T> s) {
parents[i++] = (Subscriber<T>)s;
}
};

FlowableFlatMap<T, R> op = new FlowableFlatMap<T, R>(p, mapper, delayError, maxConcurrency, prefetch);

for (int i = 0; i < n; i++) {

op.subscribe(subscribers[i]);
// FIXME needs a FlatMap subscriber
// parents[i] = FlowableFlatMap.createSubscriber(s, mapper, delayError, maxConcurrency, prefetch);
parents[i] = FlowableFlatMap.subscribe(subscribers[i], mapper, delayError, maxConcurrency, prefetch);
}

source.subscribe(parents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -187,8 +187,8 @@ public void cancel() {
public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
cancel();
onError(new IllegalStateException("Queue is full?"));
s.cancel();
onError(new MissingBackpressureException("Queue is full?"));
return;
}
}
Expand Down Expand Up @@ -344,18 +344,7 @@ void drainSync() {
return;
}

boolean empty;

try {
empty = q.isEmpty();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
for (Subscriber<? super T> s : a) {
s.onError(ex);
}
return;
}
boolean empty = q.isEmpty();

if (empty) {
for (Subscriber<? super T> s : a) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Merges the individual 'rails' of the source ParallelFlowable, unordered,
* into a single regular Publisher sequence (exposed as Px).
* into a single regular Publisher sequence (exposed as Flowable).
*
* @param <T> the value type
*/
Expand Down Expand Up @@ -123,7 +123,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
}
inner.request(1);
} else {
SimpleQueue<T> q = inner.getQueue();
SimplePlainQueue<T> q = inner.getQueue();

if (!q.offer(value)) {
cancelAll();
Expand All @@ -140,10 +140,13 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
return;
}
} else {
SimpleQueue<T> q = inner.getQueue();
SimplePlainQueue<T> q = inner.getQueue();

// FIXME overflow handling
q.offer(value);
if (!q.offer(value)) {
cancelAll();
onError(new MissingBackpressureException("Queue full?!"));
return;
}

if (getAndIncrement() != 0) {
return;
Expand Down
Loading