diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index f6320ced12..2597a6d140 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -166,7 +166,7 @@ public void onComplete() { queue.offer(b); done = true; if (enter()) { - QueueDrainHelper.drainMaxLoop(queue, actual, false, this, this); + QueueDrainHelper.drainMaxLoop(queue, actual, false, null, this); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index d521ebae80..2bc4bfbec9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -161,7 +161,7 @@ public void onComplete() { queue.offer(b); done = true; if (enter()) { - QueueDrainHelper.drainLoop(queue, actual, false, this, this); + QueueDrainHelper.drainLoop(queue, actual, false, null, this); } } DisposableHelper.dispose(timer); diff --git a/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java b/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java index 2964812c4d..25126b3327 100644 --- a/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java +++ b/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java @@ -168,7 +168,9 @@ public static boolean checkTerminated(boolean d, boolean empty, if (d) { if (delayError) { if (empty) { - disposable.dispose(); + if (disposable != null) { + disposable.dispose(); + } Throwable err = qd.error(); if (err != null) { s.onError(err); @@ -181,12 +183,16 @@ public static boolean checkTerminated(boolean d, boolean empty, Throwable err = qd.error(); if (err != null) { q.clear(); - disposable.dispose(); + if (disposable != null) { + disposable.dispose(); + } s.onError(err); return true; } else if (empty) { - disposable.dispose(); + if (disposable != null) { + disposable.dispose(); + } s.onComplete(); return true; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index d2ebe2ca11..bd18c0187d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -2014,4 +2014,64 @@ public void run() { assertEquals("Round: " + i, 5, items); } } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelExact() { + final AtomicInteger counter = new AtomicInteger(); + + Flowable.empty() + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(5, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelSkip() { + final AtomicInteger counter = new AtomicInteger(); + + Flowable.empty() + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(5, 10, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelOverlap() { + final AtomicInteger counter = new AtomicInteger(); + + Flowable.empty() + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(10, 5, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index d7b6046e42..93a9e2fb74 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; import org.mockito.*; @@ -1439,4 +1440,64 @@ public void run() { assertEquals("Round: " + i, 5, items); } } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelExact() { + final AtomicInteger counter = new AtomicInteger(); + + Observable.empty() + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(5, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelSkip() { + final AtomicInteger counter = new AtomicInteger(); + + Observable.empty() + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(5, 10, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void noCompletionCancelOverlap() { + final AtomicInteger counter = new AtomicInteger(); + + Observable.empty() + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }) + .buffer(10, 5, TimeUnit.SECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(Collections.emptyList()); + + assertEquals(0, counter.get()); + } }