Skip to content

Commit

Permalink
2.x: fix timed exact buffer calling cancel unnecessarily (#5761)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 15, 2017
1 parent 9f2beac commit 39e159f
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void onComplete() {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainMaxLoop(queue, actual, false, this, this);
QueueDrainHelper.drainMaxLoop(queue, actual, false, null, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void onComplete() {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, actual, false, this, this);
QueueDrainHelper.drainLoop(queue, actual, false, null, this);
}
}
DisposableHelper.dispose(timer);
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/io/reactivex/internal/util/QueueDrainHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ public static <T, U> boolean checkTerminated(boolean d, boolean empty,
if (d) {
if (delayError) {
if (empty) {
disposable.dispose();
if (disposable != null) {
disposable.dispose();
}
Throwable err = qd.error();
if (err != null) {
s.onError(err);
Expand All @@ -181,12 +183,16 @@ public static <T, U> boolean checkTerminated(boolean d, boolean empty,
Throwable err = qd.error();
if (err != null) {
q.clear();
disposable.dispose();
if (disposable != null) {
disposable.dispose();
}
s.onError(err);
return true;
} else
if (empty) {
disposable.dispose();
if (disposable != null) {
disposable.dispose();
}
s.onComplete();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2014,4 +2014,64 @@ public void run() {
assertEquals("Round: " + i, 5, items);
}
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelExact() {
final AtomicInteger counter = new AtomicInteger();

Flowable.<Integer>empty()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelSkip() {
final AtomicInteger counter = new AtomicInteger();

Flowable.<Integer>empty()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, 10, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelOverlap() {
final AtomicInteger counter = new AtomicInteger();

Flowable.<Integer>empty()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(10, 5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;
import org.mockito.*;
Expand Down Expand Up @@ -1439,4 +1440,64 @@ public void run() {
assertEquals("Round: " + i, 5, items);
}
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelExact() {
final AtomicInteger counter = new AtomicInteger();

Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelSkip() {
final AtomicInteger counter = new AtomicInteger();

Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, 10, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}

@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelOverlap() {
final AtomicInteger counter = new AtomicInteger();

Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(10, 5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());

assertEquals(0, counter.get());
}
}

0 comments on commit 39e159f

Please sign in to comment.