From 3768a53a89617784d2e96582352b5412b3fa3f68 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 12 Jan 2017 19:24:54 +0100 Subject: [PATCH] 2.x: fix cross-boundary invalid fusion with observeOn & zip (#4984) --- .../operators/flowable/FlowableFlatMap.java | 2 +- .../operators/flowable/FlowableZip.java | 2 +- .../observable/ObservableFlatMap.java | 2 +- .../flowable/FlowableFlatMapTest.java | 29 +++++++++++ .../operators/flowable/FlowableZipTest.java | 34 +++++++++++++ .../observable/ObservableFlatMapTest.java | 28 +++++++++++ .../observable/ObservableZipTest.java | 48 ++++++++++++++++--- 7 files changed, 135 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 99b8d86155..9401c202b0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -600,7 +600,7 @@ public void onSubscribe(Subscription s) { if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription qs = (QueueSubscription) s; - int m = qs.requestFusion(QueueSubscription.ANY); + int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { fusionMode = m; queue = qs; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java index c64d3270e9..c01ea6ef20 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java @@ -357,7 +357,7 @@ public void onSubscribe(Subscription s) { if (s instanceof QueueSubscription) { QueueSubscription f = (QueueSubscription) s; - int m = f.requestFusion(QueueSubscription.ANY); + int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { sourceMode = m; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 606f810019..cb4ee05c60 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -532,7 +532,7 @@ public void onSubscribe(Disposable s) { @SuppressWarnings("unchecked") QueueDisposable qd = (QueueDisposable) s; - int m = qd.requestFusion(QueueDisposable.ANY); + int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { fusionMode = m; queue = qd; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index ef9b503539..8a373dbe11 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -897,4 +898,32 @@ public void scalarXMap() { .test() .assertResult(2); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestSubscriber ts = Flowable.merge( + Flowable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Flowable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }) + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(2); + + List list = ts.values(); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java index eb82ba96c2..b80d7dbd5e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java @@ -1767,4 +1767,38 @@ public Integer apply(Integer a, Integer b) throws Exception { .test(0L) .assertFailure(TestException.class); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestSubscriber> ts = Flowable.zip( + Flowable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Flowable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + new BiFunction>() { + @Override + public List apply(Object t1, Object t2) throws Exception { + return Arrays.asList(t1, t2); + } + } + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1); + + List list = ts.values().get(0); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index e31725db33..4cc8425efb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -756,4 +756,32 @@ public Integer apply(Integer w) throws Exception { TestHelper.assertError(errors, 1, TestException.class); } + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestObserver ts = Observable.merge( + Observable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Observable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }) + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(2); + + List list = ts.values(); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java index c304226ad4..73e5d936fe 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -999,8 +1000,8 @@ public Object apply(final Object[] args) { public void testDownstreamBackpressureRequestsWithFiniteSyncObservables() { AtomicInteger generatedA = new AtomicInteger(); AtomicInteger generatedB = new AtomicInteger(); - Observable o1 = createInfiniteObservable(generatedA).take(Flowable.bufferSize() * 2); - Observable o2 = createInfiniteObservable(generatedB).take(Flowable.bufferSize() * 2); + Observable o1 = createInfiniteObservable(generatedA).take(Observable.bufferSize() * 2); + Observable o2 = createInfiniteObservable(generatedB).take(Observable.bufferSize() * 2); TestObserver ts = new TestObserver(); Observable.zip(o1, o2, new BiFunction() { @@ -1010,14 +1011,14 @@ public String apply(Integer t1, Integer t2) { return t1 + "-" + t2; } - }).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(ts); + }).observeOn(Schedulers.computation()).take(Observable.bufferSize() * 2).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); - assertEquals(Flowable.bufferSize() * 2, ts.valueCount()); + assertEquals(Observable.bufferSize() * 2, ts.valueCount()); System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get()); - assertTrue(generatedA.get() < (Flowable.bufferSize() * 3)); - assertTrue(generatedB.get() < (Flowable.bufferSize() * 3)); + assertTrue(generatedA.get() < (Observable.bufferSize() * 3)); + assertTrue(generatedB.get() < (Observable.bufferSize() * 3)); } private Observable createInfiniteObservable(final AtomicInteger generated) { @@ -1358,4 +1359,37 @@ public Object apply(Integer a, Integer b) throws Exception { } })); } -} + + @Test + public void noCrossBoundaryFusion() { + for (int i = 0; i < 500; i++) { + TestObserver> ts = Observable.zip( + Observable.just(1).observeOn(Schedulers.single()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + Observable.just(1).observeOn(Schedulers.computation()).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return Thread.currentThread().getName().substring(0, 4); + } + }), + new BiFunction>() { + @Override + public List apply(Object t1, Object t2) throws Exception { + return Arrays.asList(t1, t2); + } + } + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1); + + List list = ts.values().get(0); + + assertTrue(list.toString(), list.contains("RxSi")); + assertTrue(list.toString(), list.contains("RxCo")); + } + }}