Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpliceFlatStreamToMetaSingle: propagate cancel when races with data #3036

Merged
merged 9 commits into from
Aug 14, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019, 2021-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -71,12 +72,12 @@ public PublisherSource.Subscriber<Object> apply(Subscriber<? super Data> subscri
return new SplicingSubscriber<>(this, subscriber);
}

/* Visible for testing */
static final class SplicingSubscriber<Data, MetaData, Payload> implements PublisherSource.Subscriber<Object> {
private static final class SplicingSubscriber<Data, MetaData, Payload>
implements PublisherSource.Subscriber<Object> {

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object>
maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class,
Object.class, "maybePayloadSub");
private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object> maybePayloadSubUpdater =
AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybePayloadSub");

private static final String CANCELED = "CANCELED";
private static final String PENDING = "PENDING";
Expand Down Expand Up @@ -155,7 +156,9 @@ private SplicingSubscriber(SpliceFlatStreamToMetaSingle<Data, MetaData, Payload>
* Publisher}&lt;{@link Payload}&gt;
*/
private void cancelData(Subscription subscription) {
if (maybePayloadSubUpdater.compareAndSet(this, null, CANCELED)) {
final Object current = maybePayloadSubUpdater.getAndUpdate(this,
curr -> curr == null || curr == PENDING ? CANCELED : curr);
if (current == null || current == PENDING) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please carefully review the updated state machine and existing tests that we have. If you see a use-case not covered by the tests, will appreciate new ideas.

subscription.cancel();
}
}
Expand Down Expand Up @@ -195,12 +198,22 @@ public void onNext(@Nullable Object obj) {
metaSeenInOnNext = true;
final Data data;
try {
data = parent.packer.apply(meta, maybePayloadSubUpdater.compareAndSet(this, null, PENDING) ?
newPayloadPublisher() : Publisher.failed(StacklessCancellationException.newInstance(
"Canceled prematurely from Data", SplicingSubscriber.class, "cancelData(..)")));
final Publisher<Payload> payload;
if (maybePayloadSubUpdater.compareAndSet(this, null, PENDING)) {
payload = newPayloadPublisher();
} else {
final Object maybePayloadSub = this.maybePayloadSub;
assert maybePayloadSub == CANCELED : "Expected CANCELED but got: " + maybePayloadSub;
boolean cas = maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED);
assert cas : "Could not transition from CANCELED to EMPTY_COMPLETED_DELIVERED";
payload = Publisher.failed(StacklessCancellationException.newInstance(
"Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " +
maybePayloadSub, getClass(), "onNext(...)"));
}
data = parent.packer.apply(meta, payload);
assert data != null : "Packer function must return non-null Data";
} catch (Throwable t) {
assert rawSubscription != null;
assert rawSubscription != null : "Expected rawSubscription but got null";
// We know that there is nothing else that can happen on this stream as we are not sending the
// data to the dataSubscriber.
rawSubscription.cancel();
Expand All @@ -225,7 +238,7 @@ protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSu
// Subscriber which is not allowed by the Reactive Streams specification.
newSubscriber.onSubscribe(delayedSubscription);
if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) {
assert rawSubscription != null;
assert rawSubscription != null : "Expected rawSubscription but got null";
delayedSubscription.delayedSubscription(rawSubscription);
} else {
// Entering this branch means either a duplicate subscriber or a stream that completed or failed
Expand All @@ -240,8 +253,15 @@ protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSu
newSubscriber.onComplete();
} else if (maybeSubscriber instanceof Throwable && maybePayloadSubUpdater
.compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) {
// Premature error or cancel
// Premature error
newSubscriber.onError((Throwable) maybeSubscriber);
} else if (maybeSubscriber == CANCELED && maybePayloadSubUpdater
.compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) {
// Premature cancel, capture the full caller stack-trace to understand which code path
// subscribes to the payload after cancellation.
newSubscriber.onError(new CancellationException(
"Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " +
maybeSubscriber));
} else {
// Existing subscriber or terminal event consumed by other subscriber (COMPLETED_DELIVERED)
newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber,
Expand All @@ -259,17 +279,18 @@ public void onError(Throwable t) {
payloadSubscriber.onError(t);
} else {
final Object maybeSubscriber = maybePayloadSubUpdater.getAndSet(this, t);
if (maybeSubscriber == CANCELED || !metaSeenInOnNext) {
if (!metaSeenInOnNext) {
ensureResultSubscriberOnSubscribe();
dataSubscriber.onError(t);
} else if (maybeSubscriber instanceof PublisherSource.Subscriber) {
if (maybePayloadSubUpdater.compareAndSet(this, t, EMPTY_COMPLETED_DELIVERED)) {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(t);
} else {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(new IllegalStateException(
"Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber +
", failed the race with a duplicate, but neither has seen onNext()"));
terminateWithIllegalStateException((PublisherSource.Subscriber<Payload>) maybeSubscriber);
}
} else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
LOGGER.debug("Discarding a terminal error from upstream because the payload publisher was " +
"already terminated", t);
} else {
LOGGER.debug("Terminal error queued for delayed delivery to the payload publisher. " +
"If the payload is not subscribed, this event will not be delivered.", t);
Expand All @@ -289,26 +310,32 @@ public void onComplete() {
EMPTY_COMPLETED_DELIVERED)) {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onComplete();
} else {
((PublisherSource.Subscriber<Payload>) maybeSubscriber).onError(new IllegalStateException(
"Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber +
", failed the race with a duplicate, but neither has seen onNext()"));
terminateWithIllegalStateException((PublisherSource.Subscriber<Payload>) maybeSubscriber);
}
} else if (!metaSeenInOnNext) {
ensureResultSubscriberOnSubscribe();
dataSubscriber.onError(new IllegalStateException(
"Stream unexpectedly completed without emitting any items"));
} else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
LOGGER.debug("Discarding a terminal complete from upstream because the payload publisher was " +
"already terminated");
}
}
}

private void ensureResultSubscriberOnSubscribe() {
assert !metaSeenInOnNext;
assert !metaSeenInOnNext : "Already seen meta-data";
if (!onSubscribeSent) {
onSubscribeSent = true;
// Since we are going to deliver data or a terminal signal right after this,
// there is no need for this to be cancellable.
dataSubscriber.onSubscribe(IGNORE_CANCEL);
}
}

private void terminateWithIllegalStateException(PublisherSource.Subscriber<Payload> subscriber) {
subscriber.onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " +
subscriber + ", failed the race with a duplicate, but neither has seen onNext()"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2022, 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import io.servicetalk.concurrent.test.internal.TestSingleSubscriber;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.concurrent.CancellationException;

Expand Down Expand Up @@ -60,7 +62,7 @@ void streamWithHeaderAndPayloadShouldProduceDataWithEmbeddedPayload() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(2);
upstream.onNext(one, last);
Expand All @@ -77,7 +79,7 @@ void streamWithHeaderAndEmptyPayloadShouldCompleteOnPublisherOnSubscribe()
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
upstream.onComplete();
assertThat(data.getPayload().toFuture().get(), empty());
}
Expand All @@ -90,34 +92,49 @@ void emptyStreamShouldCompleteDataWithError() {
assertThat(dataSubscriber.awaitOnError(), instanceOf(IllegalStateException.class));
}

@Test
void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe() {
@ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}")
@ValueSource(booleans = {false, true})
void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe(boolean terminateUpstreamWithError) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
dataSubscriber.awaitSubscription().cancel();
assertTrue(subscription.isCancelled());
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class));
assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError);
}

@Test
void cancelDataAfterDataCompleteShouldIgnoreCancelAndDeliverPublisherOnComplete() {
@ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}")
@ValueSource(booleans = {false, true})
void cancelDataAfterDataCompleteShouldCancelUpstreamAndFailPublisherOnSubscribe(
boolean terminateUpstreamWithError) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
assertFalse(subscription.isCancelled());
dataSubscriber.awaitSubscription().cancel();
assertTrue(subscription.isCancelled());
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
upstream.onComplete();
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
payloadSubscriber.awaitOnComplete();
assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError);
}

private void assertPayloadSubscriberReceivesCancellationException(boolean terminateUpstreamWithError) {
assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class));
// Verify payloadSubscriber does not receive a terminal signal two times. If received, TestPublisherSubscriber
// will throw IllegalStateException: Subscriber has already terminated.
if (terminateUpstreamWithError) {
upstream.onError(DELIBERATE_EXCEPTION);
} else {
upstream.onComplete();
}
}

@Test
Expand All @@ -131,37 +148,51 @@ void cancelDataBeforeDataCompleteShouldDeliverError() {
assertThat(dataSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@Test
void streamErrorAfterPublisherSubscribeShouldDeliverError() {
@ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}")
@ValueSource(booleans = {false, true})
void streamErrorAfterPublisherSubscribeShouldDeliverError(boolean withPayload) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(1);
upstream.onNext(one);
if (withPayload) {
upstream.onNext(one);
}
assertFalse(subscription.isCancelled());
upstream.onError(DELIBERATE_EXCEPTION);
assertThat(payloadSubscriber.takeOnNext(), is(one));
if (withPayload) {
assertThat(payloadSubscriber.takeOnNext(), is(one));
} else {
assertThat(payloadSubscriber.pollAllOnNext(), is(empty()));
}
assertThat(payloadSubscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void streamCompleteAfterPublisherSubscribeShouldDeliverComplete() {
@ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}")
@ValueSource(booleans = {false, true})
void streamCompleteAfterPublisherSubscribeShouldDeliverComplete(boolean withPayload) {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
if (withPayload) {
upstream.onNext(one, two, last);
}
upstream.onComplete();
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
if (withPayload) {
assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last));
} else {
assertThat(payloadSubscriber.pollAllOnNext(), is(empty()));
}
payloadSubscriber.awaitOnComplete();
}

Expand All @@ -172,7 +203,7 @@ void streamCompleteBeforePublisherSubscribeShouldDeliverCompleteOnSubscribe() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
upstream.onComplete();
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitOnComplete();
Expand All @@ -186,7 +217,7 @@ void streamErrorBeforePublisherSubscribeShouldDeliverErrorOnSubscribe() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
assertFalse(subscription.isCancelled());
upstream.onError(DELIBERATE_EXCEPTION);
toSource(data.getPayload()).subscribe(payloadSubscriber);
Expand All @@ -200,7 +231,7 @@ void publisherSubscribeTwiceShouldFailSecondSubscriber() {
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
Expand All @@ -218,7 +249,7 @@ void publisherSubscribeAgainAfterCompletingInitialSubscriberShouldFailSecondSubs
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(data.meta()));
assertThat(data.meta(), equalTo(metaData.meta()));
toSource(data.getPayload()).subscribe(payloadSubscriber);
payloadSubscriber.awaitSubscription().request(3);
upstream.onNext(one, two, last);
Expand Down
Loading