From 38083e88717ba17a180a456006ed35d730a6e0c6 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 8 Aug 2024 16:18:40 -0700 Subject: [PATCH 1/8] `SpliceFlatStreamToMetaSingle`: propagate cancel when races with data --- .../netty/SpliceFlatStreamToMetaSingle.java | 48 +++++++--- .../SpliceFlatStreamToMetaSingleTest.java | 87 +++++++++++++------ 2 files changed, 95 insertions(+), 40 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 28a8fc88c8..a80dd60ba4 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -156,7 +156,9 @@ private SplicingSubscriber(SpliceFlatStreamToMetaSingle * Publisher}<{@link Payload}> */ private void cancelData(Subscription subscription) { - if (maybePayloadSubUpdater.compareAndSet(this, null, CANCELED)) { + final Object current = maybePayloadSubUpdater.getAndUpdate(this, + curr -> curr == null || curr == PENDING ? CANCELED : curr); + if (current == null || current == PENDING) { subscription.cancel(); } } @@ -196,9 +198,15 @@ public void onNext(@Nullable Object obj) { metaSeenInOnNext = true; final Data data; try { - data = parent.packer.apply(meta, maybePayloadSubUpdater.compareAndSet(this, null, PENDING) ? - newPayloadPublisher() : Publisher.failed(StacklessCancellationException.newInstance( - "Canceled prematurely from Data", SplicingSubscriber.class, "cancelData(..)"))); + final Publisher payload; + if (maybePayloadSubUpdater.compareAndSet(this, null, PENDING)) { + payload = newPayloadPublisher(); + } else { + assert maybePayloadSub == CANCELED; + maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED); + payload = Publisher.failed(newCancellationException()); + } + data = parent.packer.apply(meta, payload); assert data != null : "Packer function must return non-null Data"; } catch (Throwable t) { assert rawSubscription != null; @@ -241,8 +249,12 @@ protected void handleSubscribe(PublisherSource.Subscriber newSu newSubscriber.onComplete(); } else if (maybeSubscriber instanceof Throwable && maybePayloadSubUpdater .compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) { - // Premature error or cancel + // Premature error newSubscriber.onError((Throwable) maybeSubscriber); + } else if (maybeSubscriber == CANCELED && maybePayloadSubUpdater + .compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) { + // Premature cancel + newSubscriber.onError(newCancellationException()); } else { // Existing subscriber or terminal event consumed by other subscriber (COMPLETED_DELIVERED) newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber, @@ -260,17 +272,18 @@ public void onError(Throwable t) { payloadSubscriber.onError(t); } else { final Object maybeSubscriber = maybePayloadSubUpdater.getAndSet(this, t); - if (maybeSubscriber == CANCELED || !metaSeenInOnNext) { + if (!metaSeenInOnNext) { ensureResultSubscriberOnSubscribe(); dataSubscriber.onError(t); } else if (maybeSubscriber instanceof PublisherSource.Subscriber) { if (maybePayloadSubUpdater.compareAndSet(this, t, EMPTY_COMPLETED_DELIVERED)) { ((PublisherSource.Subscriber) maybeSubscriber).onError(t); } else { - ((PublisherSource.Subscriber) maybeSubscriber).onError(new IllegalStateException( - "Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber + - ", failed the race with a duplicate, but neither has seen onNext()")); + terminateWithIllegalStateException((PublisherSource.Subscriber) maybeSubscriber); } + } else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) { + LOGGER.debug("Discarding a terminal error from upstream because the payload publisher was " + + "already terminated", t); } else { LOGGER.debug("Terminal error queued for delayed delivery to the payload publisher. " + "If the payload is not subscribed, this event will not be delivered.", t); @@ -290,14 +303,15 @@ public void onComplete() { EMPTY_COMPLETED_DELIVERED)) { ((PublisherSource.Subscriber) maybeSubscriber).onComplete(); } else { - ((PublisherSource.Subscriber) maybeSubscriber).onError(new IllegalStateException( - "Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber + - ", failed the race with a duplicate, but neither has seen onNext()")); + terminateWithIllegalStateException((PublisherSource.Subscriber) maybeSubscriber); } } else if (!metaSeenInOnNext) { ensureResultSubscriberOnSubscribe(); dataSubscriber.onError(new IllegalStateException( "Stream unexpectedly completed without emitting any items")); + } else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) { + LOGGER.debug("Discarding a terminal complete from upstream because the payload publisher was " + + "already terminated"); } } } @@ -311,5 +325,15 @@ private void ensureResultSubscriberOnSubscribe() { dataSubscriber.onSubscribe(IGNORE_CANCEL); } } + + private void terminateWithIllegalStateException(PublisherSource.Subscriber subscriber) { + subscriber.onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " + + subscriber + ", failed the race with a duplicate, but neither has seen onNext()")); + } + + private static StacklessCancellationException newCancellationException() { + return StacklessCancellationException.newInstance( + "Canceled prematurely from Data", SplicingSubscriber.class, "cancelData(..)"); + } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java index 56b64a883d..35f23ad3fd 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java @@ -24,6 +24,8 @@ import io.servicetalk.concurrent.test.internal.TestSingleSubscriber; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.concurrent.CancellationException; @@ -60,7 +62,7 @@ void streamWithHeaderAndPayloadShouldProduceDataWithEmbeddedPayload() { upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitSubscription().request(2); upstream.onNext(one, last); @@ -77,7 +79,7 @@ void streamWithHeaderAndEmptyPayloadShouldCompleteOnPublisherOnSubscribe() upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); upstream.onComplete(); assertThat(data.getPayload().toFuture().get(), empty()); } @@ -90,34 +92,49 @@ void emptyStreamShouldCompleteDataWithError() { assertThat(dataSubscriber.awaitOnError(), instanceOf(IllegalStateException.class)); } - @Test - void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe() { + @ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}") + @ValueSource(booleans = {false, true}) + void cancelDataRacingWithDataShouldCompleteAndFailPublisherOnSubscribe(boolean terminateUpstreamWithError) { Single op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new)); toSource(op).subscribe(dataSubscriber); + upstream.onSubscribe(subscription); dataSubscriber.awaitSubscription().cancel(); + assertTrue(subscription.isCancelled()); upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); - assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class)); + assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError); } - @Test - void cancelDataAfterDataCompleteShouldIgnoreCancelAndDeliverPublisherOnComplete() { + @ParameterizedTest(name = "{displayName} [{index}]: terminateUpstreamWithError={0}") + @ValueSource(booleans = {false, true}) + void cancelDataAfterDataCompleteShouldCancelUpstreamAndFailPublisherOnSubscribe( + boolean terminateUpstreamWithError) { Single op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new)); toSource(op).subscribe(dataSubscriber); + upstream.onSubscribe(subscription); upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); + assertFalse(subscription.isCancelled()); dataSubscriber.awaitSubscription().cancel(); + assertTrue(subscription.isCancelled()); toSource(data.getPayload()).subscribe(payloadSubscriber); - payloadSubscriber.awaitSubscription().request(3); - upstream.onNext(one, two, last); - upstream.onComplete(); - assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last)); - payloadSubscriber.awaitOnComplete(); + assertPayloadSubscriberReceivesCancellationException(terminateUpstreamWithError); + } + + private void assertPayloadSubscriberReceivesCancellationException(boolean terminateUpstreamWithError) { + assertThat(payloadSubscriber.awaitOnError(), instanceOf(CancellationException.class)); + // Verify payloadSubscriber does not receive a terminal signal two times. If received, TestPublisherSubscriber + // will throw IllegalStateException: Subscriber has already terminated. + if (terminateUpstreamWithError) { + upstream.onError(DELIBERATE_EXCEPTION); + } else { + upstream.onComplete(); + } } @Test @@ -131,37 +148,51 @@ void cancelDataBeforeDataCompleteShouldDeliverError() { assertThat(dataSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); } - @Test - void streamErrorAfterPublisherSubscribeShouldDeliverError() { + @ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}") + @ValueSource(booleans = {false, true}) + void streamErrorAfterPublisherSubscribeShouldDeliverError(boolean withPayload) { Single op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new)); toSource(op).subscribe(dataSubscriber); upstream.onSubscribe(subscription); upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitSubscription().request(1); - upstream.onNext(one); + if (withPayload) { + upstream.onNext(one); + } assertFalse(subscription.isCancelled()); upstream.onError(DELIBERATE_EXCEPTION); - assertThat(payloadSubscriber.takeOnNext(), is(one)); + if (withPayload) { + assertThat(payloadSubscriber.takeOnNext(), is(one)); + } else { + assertThat(payloadSubscriber.pollAllOnNext(), is(empty())); + } assertThat(payloadSubscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @Test - void streamCompleteAfterPublisherSubscribeShouldDeliverComplete() { + @ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}") + @ValueSource(booleans = {false, true}) + void streamCompleteAfterPublisherSubscribeShouldDeliverComplete(boolean withPayload) { Single op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new)); toSource(op).subscribe(dataSubscriber); upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitSubscription().request(3); - upstream.onNext(one, two, last); + if (withPayload) { + upstream.onNext(one, two, last); + } upstream.onComplete(); - assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last)); + if (withPayload) { + assertThat(payloadSubscriber.takeOnNext(3), contains(one, two, last)); + } else { + assertThat(payloadSubscriber.pollAllOnNext(), is(empty())); + } payloadSubscriber.awaitOnComplete(); } @@ -172,7 +203,7 @@ void streamCompleteBeforePublisherSubscribeShouldDeliverCompleteOnSubscribe() { upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); upstream.onComplete(); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitOnComplete(); @@ -186,7 +217,7 @@ void streamErrorBeforePublisherSubscribeShouldDeliverErrorOnSubscribe() { upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); assertFalse(subscription.isCancelled()); upstream.onError(DELIBERATE_EXCEPTION); toSource(data.getPayload()).subscribe(payloadSubscriber); @@ -200,7 +231,7 @@ void publisherSubscribeTwiceShouldFailSecondSubscriber() { upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitSubscription().request(3); upstream.onNext(one, two, last); @@ -218,7 +249,7 @@ void publisherSubscribeAgainAfterCompletingInitialSubscriberShouldFailSecondSubs upstream.onNext(metaData); Data data = dataSubscriber.awaitOnSuccess(); assertThat(data, is(notNullValue())); - assertThat(data.meta(), equalTo(data.meta())); + assertThat(data.meta(), equalTo(metaData.meta())); toSource(data.getPayload()).subscribe(payloadSubscriber); payloadSubscriber.awaitSubscription().request(3); upstream.onNext(one, two, last); From 992e1a3003cfc2354d2821621fd9799d5be565f6 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 8 Aug 2024 17:31:09 -0700 Subject: [PATCH 2/8] Temporarily disable JavaNetSoTimeoutHttpConnectionFilterTest --- .../http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java index 8d7e18a32f..48523bdc74 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -67,6 +68,7 @@ import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertThrows; +@Disabled // FIXME: remove before merging class JavaNetSoTimeoutHttpConnectionFilterTest { @RegisterExtension From ae5bb8e9218d1c9b7931b520adc43adfc3d7b6a4 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 9 Aug 2024 15:52:36 -0700 Subject: [PATCH 3/8] fix copyright headers --- .../io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java | 2 +- .../http/netty/SpliceFlatStreamToMetaSingleTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index a80dd60ba4..4a1d082e7b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2019, 2021-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java index 35f23ad3fd..e30b895b45 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingleTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2022, 2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 14df0c1e2b63dcf3a9f44ed7274eef186d2f287d Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 9 Aug 2024 15:57:42 -0700 Subject: [PATCH 4/8] improve assertions --- .../http/netty/SpliceFlatStreamToMetaSingle.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 4a1d082e7b..26525b56e4 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -202,14 +202,16 @@ public void onNext(@Nullable Object obj) { if (maybePayloadSubUpdater.compareAndSet(this, null, PENDING)) { payload = newPayloadPublisher(); } else { - assert maybePayloadSub == CANCELED; - maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED); + final Object maybePayloadSub = this.maybePayloadSub; + assert maybePayloadSub == CANCELED : "Expected CANCELED but got: " + maybePayloadSub; + boolean cas = maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED); + assert cas : "Could not transition from CANCELED to EMPTY_COMPLETED_DELIVERED"; payload = Publisher.failed(newCancellationException()); } data = parent.packer.apply(meta, payload); assert data != null : "Packer function must return non-null Data"; } catch (Throwable t) { - assert rawSubscription != null; + assert rawSubscription != null : "Expected rawSubscription but got null"; // We know that there is nothing else that can happen on this stream as we are not sending the // data to the dataSubscriber. rawSubscription.cancel(); @@ -234,7 +236,7 @@ protected void handleSubscribe(PublisherSource.Subscriber newSu // Subscriber which is not allowed by the Reactive Streams specification. newSubscriber.onSubscribe(delayedSubscription); if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) { - assert rawSubscription != null; + assert rawSubscription != null : "Expected rawSubscription but got null"; delayedSubscription.delayedSubscription(rawSubscription); } else { // Entering this branch means either a duplicate subscriber or a stream that completed or failed @@ -317,7 +319,7 @@ public void onComplete() { } private void ensureResultSubscriberOnSubscribe() { - assert !metaSeenInOnNext; + assert !metaSeenInOnNext : "Already seen meta-data"; if (!onSubscribeSent) { onSubscribeSent = true; // Since we are going to deliver data or a terminal signal right after this, From dc47db075b6f3ed05b40ac404ca1b7fdcba6f4d3 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 9 Aug 2024 16:25:22 -0700 Subject: [PATCH 5/8] Make SplicingSubscriber private --- .../http/netty/SpliceFlatStreamToMetaSingle.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 26525b56e4..0fe062af09 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -72,12 +72,12 @@ public PublisherSource.Subscriber apply(Subscriber subscri return new SplicingSubscriber<>(this, subscriber); } - /* Visible for testing */ - static final class SplicingSubscriber implements PublisherSource.Subscriber { + private static final class SplicingSubscriber + implements PublisherSource.Subscriber { + @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater - maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, - Object.class, "maybePayloadSub"); + private static final AtomicReferenceFieldUpdater maybePayloadSubUpdater = + AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybePayloadSub"); private static final String CANCELED = "CANCELED"; private static final String PENDING = "PENDING"; From 3ddda7bfc2349f6a9262e8af2769268071b8d3dd Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Mon, 12 Aug 2024 10:35:59 -0700 Subject: [PATCH 6/8] Use different stack trace for CancellationException --- .../http/netty/SpliceFlatStreamToMetaSingle.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 0fe062af09..097cbeaf76 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -206,7 +206,9 @@ public void onNext(@Nullable Object obj) { assert maybePayloadSub == CANCELED : "Expected CANCELED but got: " + maybePayloadSub; boolean cas = maybePayloadSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED); assert cas : "Could not transition from CANCELED to EMPTY_COMPLETED_DELIVERED"; - payload = Publisher.failed(newCancellationException()); + payload = Publisher.failed(StacklessCancellationException.newInstance( + "Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " + + maybePayloadSub, getClass(), "onNext(...)")); } data = parent.packer.apply(meta, payload); assert data != null : "Packer function must return non-null Data"; @@ -256,7 +258,9 @@ protected void handleSubscribe(PublisherSource.Subscriber newSu } else if (maybeSubscriber == CANCELED && maybePayloadSubUpdater .compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) { // Premature cancel - newSubscriber.onError(newCancellationException()); + newSubscriber.onError(StacklessCancellationException.newInstance( + "Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " + + maybeSubscriber, getClass(), "handleSubscribe(...)")); } else { // Existing subscriber or terminal event consumed by other subscriber (COMPLETED_DELIVERED) newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber, @@ -332,10 +336,5 @@ private void terminateWithIllegalStateException(PublisherSource.Subscriber Date: Mon, 12 Aug 2024 10:57:32 -0700 Subject: [PATCH 7/8] Capture full stack trace when something subscribes to payload after cancel --- .../http/netty/SpliceFlatStreamToMetaSingle.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 097cbeaf76..44244e687a 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; import javax.annotation.Nonnull; @@ -257,10 +258,11 @@ protected void handleSubscribe(PublisherSource.Subscriber newSu newSubscriber.onError((Throwable) maybeSubscriber); } else if (maybeSubscriber == CANCELED && maybePayloadSubUpdater .compareAndSet(SplicingSubscriber.this, maybeSubscriber, EMPTY_COMPLETED_DELIVERED)) { - // Premature cancel - newSubscriber.onError(StacklessCancellationException.newInstance( + // Premature cancel, capture the full caller stack-trace to understand which code path + // subscribes to the payload after cancellation. + newSubscriber.onError(new CancellationException( "Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " + - maybeSubscriber, getClass(), "handleSubscribe(...)")); + maybeSubscriber)); } else { // Existing subscriber or terminal event consumed by other subscriber (COMPLETED_DELIVERED) newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber, From d0edda2ad622354d5b2d59840d2f292837ed2ab3 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 13 Aug 2024 23:33:12 -0700 Subject: [PATCH 8/8] Enable JavaNetSoTimeoutHttpConnectionFilterTest --- .../http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java index 48523bdc74..8d7e18a32f 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/JavaNetSoTimeoutHttpConnectionFilterTest.java @@ -35,7 +35,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -68,7 +67,6 @@ import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertThrows; -@Disabled // FIXME: remove before merging class JavaNetSoTimeoutHttpConnectionFilterTest { @RegisterExtension