Skip to content

Commit

Permalink
2.x: fix cross-boundary invalid fusion with observeOn & zip (Reactive…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 12, 2017
1 parent 5717827 commit 3768a53
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ public void onSubscribe(Subscription s) {
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<U> qs = (QueueSubscription<U>) s;
int m = qs.requestFusion(QueueSubscription.ANY);
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
fusionMode = m;
queue = qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void onSubscribe(Subscription s) {
if (s instanceof QueueSubscription) {
QueueSubscription<T> f = (QueueSubscription<T>) s;

int m = f.requestFusion(QueueSubscription.ANY);
int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);

if (m == QueueSubscription.SYNC) {
sourceMode = m;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public void onSubscribe(Disposable s) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.util.*;
Expand Down Expand Up @@ -897,4 +898,32 @@ public void scalarXMap() {
.test()
.assertResult(2);
}

@Test
public void noCrossBoundaryFusion() {
for (int i = 0; i < 500; i++) {
TestSubscriber<Object> ts = Flowable.merge(
Flowable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
Flowable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
})
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(2);

List<Object> list = ts.values();

assertTrue(list.toString(), list.contains("RxSi"));
assertTrue(list.toString(), list.contains("RxCo"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1767,4 +1767,38 @@ public Integer apply(Integer a, Integer b) throws Exception {
.test(0L)
.assertFailure(TestException.class);
}

@Test
public void noCrossBoundaryFusion() {
for (int i = 0; i < 500; i++) {
TestSubscriber<List<Object>> ts = Flowable.zip(
Flowable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
Flowable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
new BiFunction<Object, Object, List<Object>>() {
@Override
public List<Object> apply(Object t1, Object t2) throws Exception {
return Arrays.asList(t1, t2);
}
}
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(1);

List<Object> list = ts.values().get(0);

assertTrue(list.toString(), list.contains("RxSi"));
assertTrue(list.toString(), list.contains("RxCo"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,4 +756,32 @@ public Integer apply(Integer w) throws Exception {

TestHelper.assertError(errors, 1, TestException.class);
}

@Test
public void noCrossBoundaryFusion() {
for (int i = 0; i < 500; i++) {
TestObserver<Object> ts = Observable.merge(
Observable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
Observable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
})
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(2);

List<Object> list = ts.values();

assertTrue(list.toString(), list.contains("RxSi"));
assertTrue(list.toString(), list.contains("RxCo"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.*;
Expand Down Expand Up @@ -999,8 +1000,8 @@ public Object apply(final Object[] args) {
public void testDownstreamBackpressureRequestsWithFiniteSyncObservables() {
AtomicInteger generatedA = new AtomicInteger();
AtomicInteger generatedB = new AtomicInteger();
Observable<Integer> o1 = createInfiniteObservable(generatedA).take(Flowable.bufferSize() * 2);
Observable<Integer> o2 = createInfiniteObservable(generatedB).take(Flowable.bufferSize() * 2);
Observable<Integer> o1 = createInfiniteObservable(generatedA).take(Observable.bufferSize() * 2);
Observable<Integer> o2 = createInfiniteObservable(generatedB).take(Observable.bufferSize() * 2);

TestObserver<String> ts = new TestObserver<String>();
Observable.zip(o1, o2, new BiFunction<Integer, Integer, String>() {
Expand All @@ -1010,14 +1011,14 @@ public String apply(Integer t1, Integer t2) {
return t1 + "-" + t2;
}

}).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(ts);
}).observeOn(Schedulers.computation()).take(Observable.bufferSize() * 2).subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 2, ts.valueCount());
assertEquals(Observable.bufferSize() * 2, ts.valueCount());
System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
assertTrue(generatedA.get() < (Flowable.bufferSize() * 3));
assertTrue(generatedB.get() < (Flowable.bufferSize() * 3));
assertTrue(generatedA.get() < (Observable.bufferSize() * 3));
assertTrue(generatedB.get() < (Observable.bufferSize() * 3));
}

private Observable<Integer> createInfiniteObservable(final AtomicInteger generated) {
Expand Down Expand Up @@ -1358,4 +1359,37 @@ public Object apply(Integer a, Integer b) throws Exception {
}
}));
}
}

@Test
public void noCrossBoundaryFusion() {
for (int i = 0; i < 500; i++) {
TestObserver<List<Object>> ts = Observable.zip(
Observable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
Observable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
return Thread.currentThread().getName().substring(0, 4);
}
}),
new BiFunction<Object, Object, List<Object>>() {
@Override
public List<Object> apply(Object t1, Object t2) throws Exception {
return Arrays.asList(t1, t2);
}
}
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(1);

List<Object> list = ts.values().get(0);

assertTrue(list.toString(), list.contains("RxSi"));
assertTrue(list.toString(), list.contains("RxCo"));
}
}}

0 comments on commit 3768a53

Please sign in to comment.