Skip to content

Commit

Permalink
3.x: Fix groupBy not canceling upstream due to group abandonment (#6642)
Browse files Browse the repository at this point in the history
* 3.x: Fix groupBy not canceling upstream due to group abandonment

* Add codecov retry on connection refused

* Retry connrefused

* Connrefused not supported?
  • Loading branch information
akarnokd committed Aug 29, 2019
1 parent c9204b5 commit 9a36930
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ script: gradle/buildViaTravis.sh

# Code coverage
after_success:
- bash <(curl -s https://codecov.io/bash)
- bash <(curl -s --retry 10 https://codecov.io/bash)
- bash gradle/push_javadoc.sh

# cache between builds
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10414,6 +10414,11 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -10462,6 +10467,12 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
Expand Down Expand Up @@ -10512,6 +10523,11 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -10565,6 +10581,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -10621,6 +10642,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -10726,6 +10752,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9067,6 +9067,12 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, Consumer
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -9101,6 +9107,12 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T,
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -9138,6 +9150,12 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T,
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -9176,6 +9194,12 @@ public final <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -9217,6 +9241,12 @@ public final <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <p>
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
* re-created over and over as new upstream items trigger a new group. The behavior is
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public void onNext(T t) {
if (newGroup) {
q.offer(group);
drain();

if (group.state.tryAbandon()) {
cancel(key);
group.onComplete();

upstream.request(1);
}
}
}

Expand Down Expand Up @@ -489,12 +496,17 @@ static final class State<T, K> extends BasicIntQueueSubscription<T> implements P

final AtomicReference<Subscriber<? super T>> actual = new AtomicReference<Subscriber<? super T>>();

final AtomicBoolean once = new AtomicBoolean();

boolean outputFused;

int produced;

final AtomicInteger once = new AtomicInteger();

static final int FRESH = 0;
static final int HAS_SUBSCRIBER = 1;
static final int ABANDONED = 2;
static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER;

State(int bufferSize, GroupBySubscriber<?, K, T> parent, K key, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
this.parent = parent;
Expand All @@ -513,19 +525,30 @@ public void request(long n) {
@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
parent.cancel(key);
cancelParent();
}
}

@Override
public void subscribe(Subscriber<? super T> s) {
if (once.compareAndSet(false, true)) {
s.onSubscribe(this);
actual.lazySet(s);
drain();
} else {
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), s);
public void subscribe(Subscriber<? super T> subscriber) {
for (;;) {
int s = once.get();
if ((s & HAS_SUBSCRIBER) != 0) {
break;
}
int u = s | HAS_SUBSCRIBER;
if (once.compareAndSet(s, u)) {
subscriber.onSubscribe(this);
actual.lazySet(subscriber);
if (cancelled.get()) {
actual.lazySet(null);
} else {
drain();
}
return;
}
}
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), subscriber);
}

public void onNext(T t) {
Expand All @@ -544,6 +567,16 @@ public void onComplete() {
drain();
}

void cancelParent() {
if ((once.get() & ABANDONED) == 0) {
parent.cancel(key);
}
}

boolean tryAbandon() {
return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED);
}

void drain() {
if (getAndIncrement() != 0) {
return;
Expand Down Expand Up @@ -640,7 +673,9 @@ void drainNormal() {
if (r != Long.MAX_VALUE) {
requested.addAndGet(-e);
}
parent.upstream.request(e);
if ((once.get() & ABANDONED) == 0) {
parent.upstream.request(e);
}
}
}

Expand Down Expand Up @@ -708,7 +743,9 @@ public T poll() {
int p = produced;
if (p != 0) {
produced = 0;
parent.upstream.request(p);
if ((once.get() & ABANDONED) == 0) {
parent.upstream.request(p);
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void onNext(T t) {
getAndIncrement();

downstream.onNext(group);

if (group.state.tryAbandon()) {
cancel(key);
group.onComplete();
}
}

V v;
Expand Down Expand Up @@ -151,7 +156,7 @@ public void onComplete() {

@Override
public void dispose() {
// cancelling the main source means we don't want any more groups
// canceling the main source means we don't want any more groups
// but running groups still require new values
if (cancelled.compareAndSet(false, true)) {
if (decrementAndGet() == 0) {
Expand Down Expand Up @@ -220,10 +225,15 @@ static final class State<T, K> extends AtomicInteger implements Disposable, Obse

final AtomicBoolean cancelled = new AtomicBoolean();

final AtomicBoolean once = new AtomicBoolean();

final AtomicReference<Observer<? super T>> actual = new AtomicReference<Observer<? super T>>();

final AtomicInteger once = new AtomicInteger();

static final int FRESH = 0;
static final int HAS_SUBSCRIBER = 1;
static final int ABANDONED = 2;
static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER;

State(int bufferSize, GroupByObserver<?, K, T> parent, K key, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
this.parent = parent;
Expand All @@ -236,7 +246,7 @@ public void dispose() {
if (cancelled.compareAndSet(false, true)) {
if (getAndIncrement() == 0) {
actual.lazySet(null);
parent.cancel(key);
cancelParent();
}
}
}
Expand All @@ -248,17 +258,24 @@ public boolean isDisposed() {

@Override
public void subscribe(Observer<? super T> observer) {
if (once.compareAndSet(false, true)) {
observer.onSubscribe(this);
actual.lazySet(observer);
if (cancelled.get()) {
actual.lazySet(null);
} else {
drain();
for (;;) {
int s = once.get();
if ((s & HAS_SUBSCRIBER) != 0) {
break;
}
int u = s | HAS_SUBSCRIBER;
if (once.compareAndSet(s, u)) {
observer.onSubscribe(this);
actual.lazySet(observer);
if (cancelled.get()) {
actual.lazySet(null);
} else {
drain();
}
return;
}
} else {
EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer);
}
EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer);
}

public void onNext(T t) {
Expand Down Expand Up @@ -315,11 +332,21 @@ void drain() {
}
}

void cancelParent() {
if ((once.get() & ABANDONED) == 0) {
parent.cancel(key);
}
}

boolean tryAbandon() {
return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED);
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a, boolean delayError) {
if (cancelled.get()) {
queue.clear();
parent.cancel(key);
actual.lazySet(null);
cancelParent();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ public Flowable<Integer> apply(GroupedFlowable<Integer, Integer> v) {
}
}).subscribe(ts);

ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19);
// Behavior change: this now counts as group abandonment because concatMap
// doesn't subscribe to the 2nd+ emitted groups immediately
ts.assertValues(
0, 5, 10, 15, // First group is okay
// any other group gets abandoned so we get 16 one-element group
1, 2, 3, 4, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 18, 19
);
ts.assertComplete();
ts.assertNoErrors();
}
Expand Down
Loading

0 comments on commit 9a36930

Please sign in to comment.