Skip to content

Commit

Permalink
3.x: Cleanup addThrowable, "2.x" and null-value error messages (#6639)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Aug 28, 2019
1 parent f1441b0 commit a07c45e
Show file tree
Hide file tree
Showing 106 changed files with 553 additions and 616 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Cancellable;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableCreate extends Completable {
Expand Down Expand Up @@ -81,7 +82,7 @@ public void onError(Throwable t) {
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (get() != DisposableHelper.DISPOSED) {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableMerge extends Completable {
final Publisher<? extends CompletableSource> source;
Expand Down Expand Up @@ -51,7 +50,7 @@ static final class CompletableMergeSubscriber
final int maxConcurrency;
final boolean delayErrors;

final AtomicThrowable error;
final AtomicThrowable errors;

final CompositeDisposable set;

Expand All @@ -62,15 +61,15 @@ static final class CompletableMergeSubscriber
this.maxConcurrency = maxConcurrency;
this.delayErrors = delayErrors;
this.set = new CompositeDisposable();
this.error = new AtomicThrowable();
this.errors = new AtomicThrowable();
lazySet(1);
}

@Override
public void dispose() {
upstream.cancel();
set.dispose();
error.tryTerminateAndReport();
errors.tryTerminateAndReport();
}

@Override
Expand Down Expand Up @@ -105,28 +104,24 @@ public void onError(Throwable t) {
if (!delayErrors) {
set.dispose();

if (error.addThrowable(t)) {
if (errors.tryAddThrowableOrReport(t)) {
if (getAndSet(0) > 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
}
} else {
RxJavaPlugins.onError(t);
}
} else {
if (error.addThrowable(t)) {
if (errors.tryAddThrowableOrReport(t)) {
if (decrementAndGet() == 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
}
} else {
RxJavaPlugins.onError(t);
}
}
}

@Override
public void onComplete() {
if (decrementAndGet() == 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
}
}

Expand All @@ -136,32 +131,28 @@ void innerError(MergeInnerObserver inner, Throwable t) {
upstream.cancel();
set.dispose();

if (error.addThrowable(t)) {
if (errors.tryAddThrowableOrReport(t)) {
if (getAndSet(0) > 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
}
} else {
RxJavaPlugins.onError(t);
}
} else {
if (error.addThrowable(t)) {
if (errors.tryAddThrowableOrReport(t)) {
if (decrementAndGet() == 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
upstream.request(1);
}
}
} else {
RxJavaPlugins.onError(t);
}
}
}

void innerComplete(MergeInnerObserver inner) {
set.delete(inner);
if (decrementAndGet() == 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
upstream.request(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableMergeDelayErrorArray extends Completable {

Expand All @@ -33,8 +32,8 @@ public void subscribeActual(final CompletableObserver observer) {
final CompositeDisposable set = new CompositeDisposable();
final AtomicInteger wip = new AtomicInteger(sources.length + 1);

final AtomicThrowable error = new AtomicThrowable();
set.add(new TryTerminateAndReportDisposable(error));
final AtomicThrowable errors = new AtomicThrowable();
set.add(new TryTerminateAndReportDisposable(errors));

observer.onSubscribe(set);

Expand All @@ -45,16 +44,16 @@ public void subscribeActual(final CompletableObserver observer) {

if (c == null) {
Throwable ex = new NullPointerException("A completable source is null");
error.addThrowable(ex);
errors.tryAddThrowableOrReport(ex);
wip.decrementAndGet();
continue;
}

c.subscribe(new MergeInnerCompletableObserver(observer, set, error, wip));
c.subscribe(new MergeInnerCompletableObserver(observer, set, errors, wip));
}

if (wip.decrementAndGet() == 0) {
error.tryTerminateConsumer(observer);
errors.tryTerminateConsumer(observer);
}
}

Expand All @@ -79,14 +78,14 @@ static final class MergeInnerCompletableObserver
implements CompletableObserver {
final CompletableObserver downstream;
final CompositeDisposable set;
final AtomicThrowable error;
final AtomicThrowable errors;
final AtomicInteger wip;

MergeInnerCompletableObserver(CompletableObserver observer, CompositeDisposable set, AtomicThrowable error,
AtomicInteger wip) {
this.downstream = observer;
this.set = set;
this.error = error;
this.errors = error;
this.wip = wip;
}

Expand All @@ -97,10 +96,8 @@ public void onSubscribe(Disposable d) {

@Override
public void onError(Throwable e) {
if (error.addThrowable(e)) {
if (errors.tryAddThrowableOrReport(e)) {
tryTerminate();
} else {
RxJavaPlugins.onError(e);
}
}

Expand All @@ -111,7 +108,7 @@ public void onComplete() {

void tryTerminate() {
if (wip.decrementAndGet() == 0) {
error.tryTerminateConsumer(downstream);
errors.tryTerminateConsumer(downstream);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void subscribeActual(final CompletableObserver observer) {

final AtomicInteger wip = new AtomicInteger(1);

final AtomicThrowable error = new AtomicThrowable();
set.add(new TryTerminateAndReportDisposable(error));
final AtomicThrowable errors = new AtomicThrowable();
set.add(new TryTerminateAndReportDisposable(errors));

for (;;) {
if (set.isDisposed()) {
Expand All @@ -62,7 +62,7 @@ public void subscribeActual(final CompletableObserver observer) {
b = iterator.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
error.addThrowable(e);
errors.tryAddThrowableOrReport(e);
break;
}

Expand All @@ -80,7 +80,7 @@ public void subscribeActual(final CompletableObserver observer) {
c = ObjectHelper.requireNonNull(iterator.next(), "The iterator returned a null CompletableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
error.addThrowable(e);
errors.tryAddThrowableOrReport(e);
break;
}

Expand All @@ -90,11 +90,11 @@ public void subscribeActual(final CompletableObserver observer) {

wip.getAndIncrement();

c.subscribe(new MergeInnerCompletableObserver(observer, set, error, wip));
c.subscribe(new MergeInnerCompletableObserver(observer, set, errors, wip));
}

if (wip.decrementAndGet() == 0) {
error.tryTerminateConsumer(observer);
errors.tryTerminateConsumer(observer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,13 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (errors.addThrowable(t)) {
if (errors.tryAddThrowableOrReport(t)) {
subscribers.dispose();
synchronized (this) {
buffers = null;
}
done = true;
drain();
} else {
RxJavaPlugins.onError(t);
}
}

Expand Down Expand Up @@ -264,8 +262,7 @@ void drain() {
boolean d = done;
if (d && errors.get() != null) {
q.clear();
Throwable ex = errors.terminate();
a.onError(ex);
errors.tryTerminateConsumer(a);
return;
}

Expand Down Expand Up @@ -294,8 +291,7 @@ void drain() {
if (done) {
if (errors.get() != null) {
q.clear();
Throwable ex = errors.terminate();
a.onError(ex);
errors.tryTerminateConsumer(a);
return;
} else if (q.isEmpty()) {
a.onComplete();
Expand Down
Loading

0 comments on commit a07c45e

Please sign in to comment.