From fd653344ccb4dc1cbb72e40033d0bd1d5c84d8ab Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Fri, 10 May 2024 17:19:26 -0700 Subject: [PATCH] Traffic resiliency outstanding improvements (comments) (#2917) * Traffic resiliency outstanding improvements (comments) --- gradle.properties | 4 +- .../gradle/checkstyle/suppressions.xml | 24 ++ .../limiter/api/AimdCapacityLimiter.java | 14 +- .../api/AimdCapacityLimiterBuilder.java | 6 +- .../limiter/api/AllowAllCapacityLimiter.java | 52 +-- .../capacity/limiter/api/CapacityLimiter.java | 8 +- .../limiter/api/CapacityLimiters.java | 7 +- .../limiter/api/FixedCapacityLimiter.java | 11 +- .../api/FixedCapacityLimiterBuilder.java | 7 +- .../limiter/api/GradientCapacityLimiter.java | 14 +- .../api/GradientCapacityLimiterBuilder.java | 6 +- .../api/GradientCapacityLimiterProfiles.java | 37 +-- .../api/GradientCapacityLimiterUtils.java | 2 +- .../capacity/limiter/api/Preconditions.java | 104 ++++++ ...tion.java => RequestDroppedException.java} | 17 +- .../AbstractTrafficManagementHttpFilter.java | 21 +- .../http/ClientPeerRejectionPolicy.java | 169 ++++++++++ ... DelayedRetryRequestDroppedException.java} | 20 +- .../http/PeerCapacityRejectionPolicy.java | 128 -------- ... => RetryableRequestDroppedException.java} | 16 +- .../http/SafeTrafficResiliencyObserver.java | 6 +- .../http/ServiceRejectionPolicy.java | 299 ++++++++++++++++++ .../traffic/resilience/http/StateContext.java | 7 +- .../TrafficResilienceHttpClientFilter.java | 99 +++--- .../TrafficResilienceHttpServiceFilter.java | 294 ++--------------- .../http/TrafficResiliencyObserver.java | 4 +- .../http/CapacityClientServerTest.java | 13 +- ...TrafficResilienceHttpClientFilterTest.java | 20 +- ...rafficResilienceHttpServiceFilterTest.java | 12 +- .../utils/internal/NumberUtils.java | 79 ----- 30 files changed, 820 insertions(+), 680 deletions(-) create mode 100644 servicetalk-capacity-limiter-api/gradle/checkstyle/suppressions.xml create mode 100644 servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/Preconditions.java rename servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/{RequestRejectedException.java => RequestDroppedException.java} (69%) create mode 100644 servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ClientPeerRejectionPolicy.java rename servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/{DelayedRetryRequestRejectedException.java => DelayedRetryRequestDroppedException.java} (76%) delete mode 100644 servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/PeerCapacityRejectionPolicy.java rename servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/{RetryableRequestRejectedException.java => RetryableRequestDroppedException.java} (75%) create mode 100644 servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ServiceRejectionPolicy.java diff --git a/gradle.properties b/gradle.properties index 574bda4ace..b8d4534c73 100644 --- a/gradle.properties +++ b/gradle.properties @@ -63,6 +63,9 @@ protoGoogleCommonProtosVersion=2.29.0 javaPoetVersion=1.13.0 shadowPluginVersion=8.1.1 +# resilience4j - jdk8 compat +resilience4jVersion=1.7.1 + # Test dependencies jmhCoreVersion=1.37 jmhPluginVersion=0.7.2 @@ -82,4 +85,3 @@ commonsLangVersion=2.6 grpcVersion=1.61.1 javaxAnnotationsApiVersion=1.3.5 jsonUnitVersion=2.38.0 -resilience4jVersion=1.7.1 diff --git a/servicetalk-capacity-limiter-api/gradle/checkstyle/suppressions.xml b/servicetalk-capacity-limiter-api/gradle/checkstyle/suppressions.xml new file mode 100644 index 0000000000..910cd5eac2 --- /dev/null +++ b/servicetalk-capacity-limiter-api/gradle/checkstyle/suppressions.xml @@ -0,0 +1,24 @@ + + + + + + + diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiter.java index aafb549b88..cdc7d739b3 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiter.java @@ -46,7 +46,8 @@ /** * A client side dynamic {@link CapacityLimiter} that adapts its limit based on a configurable range of concurrency * {@link #min} and {@link #max}, and re-evaluates this limit upon a request-drop event - * (e.g., timeout or rejection due to capacity). + * (e.g., timeout or rejection due to capacity). It's not ideal for server-side solutions, due to the slow recover + * mechanism it offers, which can lead in significant traffic loss during the recovery window. *

* The limit translates to a concurrency figure, e.g., how many requests can be in-flight simultaneously and doesn't * represent a constant rate (i.e., has no notion of time). @@ -112,7 +113,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co if (pending >= limit || pending == max) { // prevent pending going above max if limit is fractional ticket = null; } else { - ticket = new DefaultTicket(this, (int) limit - pending); + ticket = new DefaultTicket(this, (int) limit - pending, pending); pending++; } l = limit; @@ -208,10 +209,12 @@ private static final class DefaultTicket implements Ticket, LimiterState { private static final int UNSUPPORTED = -1; private final AimdCapacityLimiter provider; private final int remaining; + private final int pending; - DefaultTicket(final AimdCapacityLimiter provider, final int remaining) { + DefaultTicket(final AimdCapacityLimiter provider, final int remaining, final int pending) { this.provider = provider; this.remaining = remaining; + this.pending = pending; } @Override @@ -224,6 +227,11 @@ public int remaining() { return remaining; } + @Override + public int pending() { + return pending; + } + @Override public int completed() { provider.onSuccess(); diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiterBuilder.java index a608d15034..c6e2701ef6 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AimdCapacityLimiterBuilder.java @@ -20,10 +20,10 @@ import java.util.function.LongSupplier; import javax.annotation.Nullable; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensureBetweenZeroAndOneExclusive; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensurePositive; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensureRange; import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative; -import static io.servicetalk.utils.internal.NumberUtils.ensureBetweenZeroAndOneExclusive; -import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; -import static io.servicetalk.utils.internal.NumberUtils.ensureRange; import static java.lang.Integer.MAX_VALUE; import static java.util.Objects.requireNonNull; diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AllowAllCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AllowAllCapacityLimiter.java index fe0da91990..89f8178a8d 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AllowAllCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/AllowAllCapacityLimiter.java @@ -19,13 +19,36 @@ import javax.annotation.Nullable; +import static java.lang.Integer.MAX_VALUE; + final class AllowAllCapacityLimiter implements CapacityLimiter { static final CapacityLimiter INSTANCE = new AllowAllCapacityLimiter(); private static final int UNSUPPORTED = -1; - private final Ticket noOpToken = new Ticket() { + private static final Ticket noOpToken = new DefaultTicket(); + + private AllowAllCapacityLimiter() { + // Singleton + } + + @Override + public String name() { + return AllowAllCapacityLimiter.class.getSimpleName(); + } + + @Override + public Ticket tryAcquire(final Classification classification, @Nullable final ContextMap context) { + return noOpToken; + } + + @Override + public String toString() { + return name(); + } + + private static final class DefaultTicket implements Ticket, LimiterState { @Override public LimiterState state() { - return null; + return this; } @Override @@ -47,24 +70,15 @@ public int failed(@Nullable final Throwable error) { public int ignored() { return UNSUPPORTED; } - }; - private AllowAllCapacityLimiter() { - // Singleton - } - - @Override - public String name() { - return AllowAllCapacityLimiter.class.getSimpleName(); - } - - @Override - public Ticket tryAcquire(final Classification classification, @Nullable final ContextMap context) { - return noOpToken; - } + @Override + public int pending() { + return -1; + } - @Override - public String toString() { - return name(); + @Override + public int remaining() { + return MAX_VALUE; + } } } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiter.java index 684eb37ba1..52e91cb5b8 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiter.java @@ -83,6 +83,13 @@ interface LimiterState { * @return the remaining allowance of the {@link CapacityLimiter} when the {@link Ticket} was issued. */ int remaining(); + + /** + * Returns the current pending (in use capacity) demand. + * If the pending is unknown a negative value i.e., -1 is allowed to express this. + * @return the current pending (in use capacity) demand. + */ + int pending(); } /** @@ -101,7 +108,6 @@ interface Ticket { * Representation of the state of the {@link CapacityLimiter} when this {@link Ticket} was issued. * @return the {@link LimiterState state} of the limiter at the time this {@link Ticket} was issued. */ - @Nullable LimiterState state(); /** diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiters.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiters.java index 3b59a52a8e..d86ce9074a 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiters.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/CapacityLimiters.java @@ -57,16 +57,19 @@ public static CapacityLimiter composite(final List providers) { * target limit for this request will be 70% of the 10 = 7. If current consumption is less than 7, the request * will be permitted. * + * @param capacity The fixed capacity value for this limiter. * @return A {@link CapacityLimiter} builder to configure the available parameters. */ - public static FixedCapacityLimiterBuilder fixedCapacity() { - return new FixedCapacityLimiterBuilder(); + public static FixedCapacityLimiterBuilder fixedCapacity(final int capacity) { + return new FixedCapacityLimiterBuilder(capacity); } /** * AIMD is a request drop based dynamic {@link CapacityLimiter} for clients, * that adapts its limit based on a configurable range of concurrency and re-evaluates this limit upon * a {@link Ticket#dropped() request-drop event (eg. timeout or rejection due to capacity)}. + * It's not ideal for server-side solutions, due to the slow recover mechanism it offers, which can lead in + * significant traffic loss during the recovery window. *

* The limit translates to a concurrency figure, e.g. how many requests can be in-flight simultaneously and doesn't * represent a constant rate (i.e. has no notion of time). Requests per second when that limit is met will be diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiter.java index e955ea8033..f55ff0d77a 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiter.java @@ -74,7 +74,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co final int newPending = currPending + 1; if (pendingUpdater.compareAndSet(this, currPending, newPending)) { notifyObserver(newPending); - return new DefaultTicket(this, effectiveLimit - newPending); + return new DefaultTicket(this, effectiveLimit - newPending, newPending); } } } @@ -99,10 +99,12 @@ private static final class DefaultTicket implements Ticket, LimiterState { private final FixedCapacityLimiter fixedCapacityProvider; private final int remaining; + private final int pending; - DefaultTicket(final FixedCapacityLimiter fixedCapacityProvider, int remaining) { + DefaultTicket(final FixedCapacityLimiter fixedCapacityProvider, final int remaining, final int pending) { this.fixedCapacityProvider = fixedCapacityProvider; this.remaining = remaining; + this.pending = pending; } @Override @@ -115,6 +117,11 @@ public int remaining() { return remaining; } + @Override + public int pending() { + return pending; + } + private int release() { final int pending = pendingUpdater.decrementAndGet(fixedCapacityProvider); fixedCapacityProvider.notifyObserver(pending); diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiterBuilder.java index 7496ae4e0d..c15283e99b 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/FixedCapacityLimiterBuilder.java @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import static io.servicetalk.utils.internal.NumberUtils.ensureNonNegative; import static java.util.Objects.requireNonNull; /** @@ -32,6 +33,10 @@ public final class FixedCapacityLimiterBuilder { @Nullable private StateObserver observer; + FixedCapacityLimiterBuilder(final int capacity) { + this.capacity = ensureNonNegative(capacity, "capacity"); + } + /** * Defines a name for this {@link CapacityLimiter}. * @param name the name to be used when building this {@link CapacityLimiter}. @@ -51,7 +56,7 @@ public FixedCapacityLimiterBuilder name(final String name) { * @return {@code this}. */ public FixedCapacityLimiterBuilder capacity(final int capacity) { - this.capacity = capacity; + this.capacity = ensureNonNegative(capacity, "capacity"); return this; } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index 1a176db5bc..b62f962f63 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -59,8 +59,7 @@ *

* The algorithm is heavily influenced by the following prior-art *

*/ @@ -133,7 +132,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co newLimit = (int) limit; if (pending < limit) { newPending = ++pending; - ticket = new DefaultTicket(this, newLimit - newPending); + ticket = new DefaultTicket(this, newLimit - newPending, newPending); } } @@ -241,11 +240,13 @@ private static final class DefaultTicket implements Ticket, LimiterState { private final long startTime; private final GradientCapacityLimiter provider; private final int remaining; + private final int pending; - DefaultTicket(final GradientCapacityLimiter provider, final int remaining) { + DefaultTicket(final GradientCapacityLimiter provider, final int remaining, final int pending) { this.provider = provider; this.startTime = provider.timeSource.getAsLong(); this.remaining = remaining; + this.pending = pending; } @Override @@ -258,6 +259,11 @@ public int remaining() { return remaining; } + @Override + public int pending() { + return pending; + } + @Override public int completed() { return provider.onSuccess(provider.timeSource.getAsLong() - startTime); diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java index 9d43d363fe..a278001454 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java @@ -43,9 +43,9 @@ import static io.servicetalk.capacity.limiter.api.GradientCapacityLimiterProfiles.DEFAULT_SUSPEND_INCR; import static io.servicetalk.capacity.limiter.api.GradientCapacityLimiterProfiles.GREEDY_HEADROOM; import static io.servicetalk.capacity.limiter.api.GradientCapacityLimiterProfiles.MIN_SAMPLING_DURATION; -import static io.servicetalk.utils.internal.NumberUtils.ensureBetweenZeroAndOne; -import static io.servicetalk.utils.internal.NumberUtils.ensureBetweenZeroAndOneExclusive; -import static io.servicetalk.utils.internal.NumberUtils.ensureGreaterThan; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensureBetweenZeroAndOne; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensureBetweenZeroAndOneExclusive; +import static io.servicetalk.capacity.limiter.api.Preconditions.ensureGreaterThan; import static java.util.Objects.requireNonNull; /** diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterProfiles.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterProfiles.java index 25e7288033..19850117e9 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterProfiles.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterProfiles.java @@ -39,16 +39,15 @@ public final class GradientCapacityLimiterProfiles { static final float DEFAULT_ON_DROP = 0.5f; static final float DEFAULT_ON_LIMIT = 0.2f; static final float DEFAULT_MIN_GRADIENT = 0.2f; - static final float GREEDY_MIN_GRADIENT = 0.5f; static final float DEFAULT_MAX_GRADIENT = 1.2f; static final float GREEDY_MAX_GRADIENT = 1.8f; - static final float EXPERIMENTAL_GREEDY_ON_LIMIT = 0.9f; - static final float EXPERIMENTAL_GREEDY_ON_DROP = 0.95f; - static final float EXPERIMENTAL_GREEDY_MIN_GRADIENT = 0.90f; + static final float GREEDY_ON_LIMIT = 0.9f; + static final float GREEDY_ON_DROP = 0.95f; + static final float GREEDY_MIN_GRADIENT = 0.90f; static final Duration DEFAULT_LIMIT_UPDATE_INTERVAL = ofSeconds(1); static final BiPredicate DEFAULT_SUSPEND_INCR = blastRadius(2); static final BiPredicate DEFAULT_SUSPEND_DEC = (__, ___) -> false; - static final BiPredicate EXPERIMENTAL_SUSPEND_DEC = occupancyFactor(.9f); + static final BiPredicate SUSPEND_DEC = occupancyFactor(.9f); static final BiFunction DEFAULT_HEADROOM = (__, ___) -> 0.0; static final BiFunction GREEDY_HEADROOM = (grad, limit) -> Math.sqrt(grad * limit); static final Duration MIN_SAMPLING_DURATION = Duration.ofMillis(50); @@ -57,7 +56,7 @@ public final class GradientCapacityLimiterProfiles { static final BiFunction CALMER_RATIO = (tracker, calmer) -> calmer < (tracker / 2) ? .90f : -1f; static final LatencyTracker DEFAULT_SHORT_LATENCY_TRACKER = new LatencyTracker.LastSample(); - static final LatencyTracker EXPERIMENTAL_SHORT_LATENCY_TRACKER = new EMA(ofSeconds(10).toNanos(), + static final LatencyTracker SHORT_LATENCY_TRACKER = new EMA(ofSeconds(10).toNanos(), SHORT_LATENCY_CALMER_TRACKER, CALMER_RATIO); static final LatencyTracker DEFAULT_LONG_LATENCY_TRACKER = new EMA(ofMinutes(10).toNanos(), LONG_LATENCY_CALMER_TRACKER, CALMER_RATIO); @@ -86,6 +85,7 @@ public static Consumer preferLatency() { * that tries to push the limit higher until a significant gradient change is noticed. It will allow limit increases * while latency is changing, favouring throughput overall, so latency sensitive application may not want to use * this profile. + * * @return Settings for the {@link GradientCapacityLimiterBuilder} for an aggressive Gradient * {@link CapacityLimiter}. */ @@ -93,28 +93,9 @@ public static Consumer preferThroughput() { return builder -> builder.minGradient(GREEDY_MIN_GRADIENT) .maxGradient(GREEDY_MAX_GRADIENT) - .headroom(GREEDY_HEADROOM); - } - - /** - * The settings applied from this profile demonstrate aggressive behaviour of the {@link CapacityLimiter}, - * that tries to push the limit higher until a significant gradient change is noticed. It will allow limit increases - * while latency is changing, favouring throughput overall, so latency sensitive application may not want to use - * this profile. - *

- * Note: This experimental profile is a new configuration that we are trying to collect metrics - * on the behavior and how it compares against the exiting offer. - * - * @return Settings for the {@link GradientCapacityLimiterBuilder} for an aggressive Gradient - * {@link CapacityLimiter}. - */ - public static Consumer preferThroughputExperimental() { - return builder -> - builder.minGradient(EXPERIMENTAL_GREEDY_MIN_GRADIENT) - .maxGradient(GREEDY_MAX_GRADIENT) - .backoffRatio(EXPERIMENTAL_GREEDY_ON_DROP, EXPERIMENTAL_GREEDY_ON_LIMIT) - .shortLatencyTracker(EXPERIMENTAL_SHORT_LATENCY_TRACKER) - .suspendLimitDecrease(EXPERIMENTAL_SUSPEND_DEC) + .backoffRatio(GREEDY_ON_DROP, GREEDY_ON_LIMIT) + .shortLatencyTracker(SHORT_LATENCY_TRACKER) + .suspendLimitDecrease(SUSPEND_DEC) .headroom(GREEDY_HEADROOM); } } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterUtils.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterUtils.java index fe6ad3995a..b0b3e8b64a 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterUtils.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterUtils.java @@ -17,7 +17,7 @@ import java.util.function.BiPredicate; -public final class GradientCapacityLimiterUtils { +final class GradientCapacityLimiterUtils { private GradientCapacityLimiterUtils() { // No instances diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/Preconditions.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/Preconditions.java new file mode 100644 index 0000000000..a93533c08e --- /dev/null +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/Preconditions.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.capacity.limiter.api; + +/** + * Number utilities + */ +final class Preconditions { + + private Preconditions() { + } + + /** + * Ensures the float is positive, excluding zero. + * + * @param value the float value to validate + * @param name name of the variable + * @return the passed value if all checks pass + * @throws IllegalArgumentException if the passed float is not greater than zero + */ + public static float ensurePositive(final float value, final String name) { + if (value <= 0.0) { + throw new IllegalArgumentException(name + ": " + value + " (expected > 0.0)"); + } + return value; + } + + /** + * Ensures the float is greater than the min specified. + * + * @param value the float value to validate + * @param min the float min to validate against + * @param field name of the variable + * @return the passed value if all checks pass + * @throws IllegalArgumentException if the passed float doesn't meet the requirements + */ + public static float ensureGreaterThan(final float value, final float min, final String field) { + if (value <= min) { + throw new IllegalArgumentException(field + ": " + value + " (expected: > " + min + ")"); + } + return value; + } + + /** + * Ensures the float is between 0 and 1 (inclusive). + * + * @param value the float value to validate + * @param field name of the variable + * @return the passed value if all checks pass + * @throws IllegalArgumentException if the passed float doesn't meet the requirements + */ + public static float ensureBetweenZeroAndOne(final float value, final String field) { + if (value < 0.0f || value > 1.0f) { + throw new IllegalArgumentException(field + ": " + value + " (expected: 0.0f <= " + field + " <= 1.0f)"); + } + return value; + } + + /** + * Ensures the int is between the provided range (inclusive). + * + * @param value the int value to validate + * @param min the min int value to validate against (inclusive) + * @param max the max int value to validate against (inclusive) + * @param field name of the variable + * @return the passed value if all checks pass + * @throws IllegalArgumentException if the passed int doesn't meet the requirements + */ + public static int ensureRange(final int value, final int min, final int max, final String field) { + if (value < min || value > max) { + throw new IllegalArgumentException(field + ": " + value + + " (expected: " + min + " <= " + field + " <= " + max + ")"); + } + return value; + } + + /** + * Ensures the float is between 0 and 1 (exclusive). + * + * @param value the float value to validate + * @param field name of the variable + * @return the passed value if all checks pass + * @throws IllegalArgumentException if the passed float doesn't meet the requirements + */ + public static float ensureBetweenZeroAndOneExclusive(final float value, final String field) { + if (value <= 0.0f || value >= 1.0f) { + throw new IllegalArgumentException(field + ": " + value + " (expected: 0.0f < " + field + " < 1.0f)"); + } + return value; + } +} diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestRejectedException.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestDroppedException.java similarity index 69% rename from servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestRejectedException.java rename to servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestDroppedException.java index e661fd4cd9..c52352790c 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestRejectedException.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/RequestDroppedException.java @@ -18,16 +18,17 @@ import javax.annotation.Nullable; /** - * An {@link Exception} to indicate that a request was rejected by a client/server due to capacity constraints. + * An {@link Exception} to indicate that a request was dropped by a client/server likely due to capacity constraints or + * as-such interpretation of peer feedback according to configuration. */ -public class RequestRejectedException extends RuntimeException { +public class RequestDroppedException extends RuntimeException { private static final long serialVersionUID = 2152182132883133067L; /** * Creates a new instance. */ - public RequestRejectedException() { + public RequestDroppedException() { } /** @@ -35,7 +36,7 @@ public RequestRejectedException() { * * @param message the detail message. */ - public RequestRejectedException(@Nullable final String message) { + public RequestDroppedException(@Nullable final String message) { super(message); } @@ -45,7 +46,7 @@ public RequestRejectedException(@Nullable final String message) { * @param message the detail message. * @param cause of this exception. */ - public RequestRejectedException(@Nullable final String message, @Nullable final Throwable cause) { + public RequestDroppedException(@Nullable final String message, @Nullable final Throwable cause) { super(message, cause); } @@ -54,7 +55,7 @@ public RequestRejectedException(@Nullable final String message, @Nullable final * * @param cause of this exception. */ - public RequestRejectedException(@Nullable final Throwable cause) { + public RequestDroppedException(@Nullable final Throwable cause) { super(cause); } @@ -66,8 +67,8 @@ public RequestRejectedException(@Nullable final Throwable cause) { * @param enableSuppression {@code true} if suppression should be enabled. * @param writableStackTrace {@code true} if the stack trace should be writable */ - public RequestRejectedException(@Nullable final String message, @Nullable final Throwable cause, - final boolean enableSuppression, final boolean writableStackTrace) { + public RequestDroppedException(@Nullable final String message, @Nullable final Throwable cause, + final boolean enableSuppression, final boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/AbstractTrafficManagementHttpFilter.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/AbstractTrafficManagementHttpFilter.java index 50a7bf1890..0ec94ab574 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/AbstractTrafficManagementHttpFilter.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/AbstractTrafficManagementHttpFilter.java @@ -18,7 +18,7 @@ import io.servicetalk.capacity.limiter.api.CapacityLimiter; import io.servicetalk.capacity.limiter.api.CapacityLimiter.Ticket; import io.servicetalk.capacity.limiter.api.Classification; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.circuit.breaker.api.CircuitBreaker; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TerminalSignalConsumer; @@ -31,7 +31,7 @@ import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpResponseFactory; import io.servicetalk.http.utils.BeforeFinallyHttpOperator; -import io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.PassthroughRequestRejectedException; +import io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.PassthroughRequestDroppedException; import io.servicetalk.traffic.resilience.http.TrafficResiliencyObserver.TicketObserver; import io.servicetalk.transport.api.ServerListenContext; @@ -55,11 +55,11 @@ import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; abstract class AbstractTrafficManagementHttpFilter implements HttpExecutionStrategyInfluencer { - private static final RequestRejectedException CAPACITY_REJECTION = unknownStackTrace( - new RequestRejectedException("Service under heavy load", null, false, true), + private static final RequestDroppedException CAPACITY_REJECTION = unknownStackTrace( + new RequestDroppedException("Service under heavy load", null, false, true), AbstractTrafficManagementHttpFilter.class, "remoteRejection"); - private static final RequestRejectedException BREAKER_REJECTION = unknownStackTrace( - new RequestRejectedException("Service Unavailable", null, false, true), + private static final RequestDroppedException BREAKER_REJECTION = unknownStackTrace( + new RequestDroppedException("Service Unavailable", null, false, true), AbstractTrafficManagementHttpFilter.class, "breakerRejection"); protected static final Single DEFAULT_CAPACITY_REJECTION = @@ -191,9 +191,9 @@ abstract Single handleLocalCapacityRejection( abstract Single handleLocalBreakerRejection( StreamingHttpRequest request, @Nullable StreamingHttpResponseFactory responseFactory, - @Nullable CircuitBreaker breaker); + CircuitBreaker breaker); - RuntimeException peerCapacityRejection(final StreamingHttpResponse resp) { + RuntimeException peerRejection(final StreamingHttpResponse resp) { return CAPACITY_REJECTION; } @@ -225,8 +225,8 @@ private Single handleAllow( .concat(Single.failed(peerBreakerRejection(resp, breaker))) .shareContextOnSubscribe(); } else if (capacityRejectionPredicate.test(resp)) { - final RuntimeException rejection = peerCapacityRejection(resp); - if (PassthroughRequestRejectedException.class.equals(rejection.getClass())) { + final RuntimeException rejection = peerRejection(resp); + if (PassthroughRequestDroppedException.class.equals(rejection.getClass())) { return Single.failed(rejection).shareContextOnSubscribe(); } return resp.payloadBody().ignoreElements() @@ -311,7 +311,6 @@ static final class TrackingDelegatingTicket implements Ticket { this.requestHashCode = requestHashCode; } - @Nullable @Override public CapacityLimiter.LimiterState state() { return delegate.state(); diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ClientPeerRejectionPolicy.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ClientPeerRejectionPolicy.java new file mode 100644 index 0000000000..13c3a9705c --- /dev/null +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ClientPeerRejectionPolicy.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.traffic.resilience.http; + +import io.servicetalk.capacity.limiter.api.CapacityLimiter; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; +import io.servicetalk.circuit.breaker.api.CircuitBreaker; +import io.servicetalk.http.api.HttpResponseMetaData; +import io.servicetalk.http.api.HttpResponseStatus; +import io.servicetalk.http.api.StreamingHttpResponse; + +import java.time.Duration; +import java.util.function.Function; +import java.util.function.Predicate; + +import static io.servicetalk.http.api.HttpResponseStatus.BAD_GATEWAY; +import static io.servicetalk.http.api.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT_PASSTHROUGH; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT_RETRY; +import static java.time.Duration.ZERO; +import static java.util.Objects.requireNonNull; + +/** + * Policy for peer capacity rejections that allows customization of behavior (retries or pass-through). + * This is meant to be used as a policy on the {@link TrafficResilienceHttpServiceFilter}. + * @see TrafficResilienceHttpClientFilter.Builder#rejectionPolicy(ClientPeerRejectionPolicy) + */ +public final class ClientPeerRejectionPolicy { + + /** + * Default rejection observer for dropped requests from an external sourced. + * see. {@link TrafficResilienceHttpClientFilter.Builder#rejectionPolicy(ClientPeerRejectionPolicy)}. + *

+ * The default predicate matches the following HTTP response codes: + *

    + *
  • {@link HttpResponseStatus#TOO_MANY_REQUESTS}
  • + *
  • {@link HttpResponseStatus#BAD_GATEWAY}
  • + *
  • {@link HttpResponseStatus#SERVICE_UNAVAILABLE}
  • + *
+ *

+ * If a {@link CircuitBreaker} is used consider adjusting this predicate to avoid considering + * {@link HttpResponseStatus#SERVICE_UNAVAILABLE} as a capacity issue. + */ + public static final Predicate DEFAULT_CAPACITY_REJECTION_PREDICATE = metaData -> + // Some proxies are known to return BAD_GATEWAY when the upstream is unresponsive (i.e. heavy load). + metaData.status().code() == TOO_MANY_REQUESTS.code() || metaData.status().code() == BAD_GATEWAY.code() || + metaData.status().code() == SERVICE_UNAVAILABLE.code(); + + /** + * Default rejection policy for peer responses. + * The following responses will be considered rejections, and exercise the rejection policy; + *

    + *
  • {@link HttpResponseStatus#TOO_MANY_REQUESTS}
  • + *
  • {@link HttpResponseStatus#BAD_GATEWAY}
  • + *
  • {@link HttpResponseStatus#SERVICE_UNAVAILABLE}
  • + *
+ *

+ * The default behavior upon such a case, is to issue a retryable exception with no pre-set offset delay (outside + * the default backoff policy of configured retry filter). + */ + public static final ClientPeerRejectionPolicy DEFAULT_PEER_REJECTION_POLICY = + new ClientPeerRejectionPolicy(DEFAULT_CAPACITY_REJECTION_PREDICATE, REJECT_RETRY, __ -> ZERO); + + enum Type { + REJECT, + REJECT_PASSTHROUGH, + REJECT_RETRY, + } + + private final Predicate predicate; + private final Type type; + private final Function delayProvider; + + private ClientPeerRejectionPolicy(final Predicate predicate, + final Type type) { + this.predicate = predicate; + this.type = type; + this.delayProvider = __ -> ZERO; + } + + ClientPeerRejectionPolicy(final Predicate predicate, + final Type type, + final Function delayProvider) { + this.predicate = predicate; + this.type = type; + this.delayProvider = delayProvider; + } + + Predicate predicate() { + return predicate; + } + + Type type() { + return type; + } + + Function delayProvider() { + return delayProvider; + } + + /** + * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the + * {@link CapacityLimiter} in use, but allow the original response from the upstream to pass-through this filter. + * @param predicate The {@link Predicate} to evaluate responses. + * Returning true from this {@link Predicate} signifies that the response was capacity + * related rejection from the peer. + * @return A {@link ClientPeerRejectionPolicy}. + */ + public static ClientPeerRejectionPolicy ofPassthrough(final Predicate predicate) { + return new ClientPeerRejectionPolicy(predicate, REJECT_PASSTHROUGH); + } + + /** + * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the + * {@link CapacityLimiter} in use, and translate that to en exception. + * @param rejectionPredicate The {@link Predicate} to evaluate responses. + * Returning true from this {@link Predicate} signifies that the response was capacity + * related rejection from the peer. + * @return A {@link ClientPeerRejectionPolicy}. + */ + public static ClientPeerRejectionPolicy ofRejection( + final Predicate rejectionPredicate) { + return new ClientPeerRejectionPolicy(rejectionPredicate, REJECT); + } + + /** + * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the + * {@link CapacityLimiter} in use, and translate that to an exception that contains "delay" information useful when + * retrying it through a retrying filter. + * @param rejectionPredicate The {@link Predicate} to evaluate responses. + * Returning true from this {@link Predicate} signifies that the response was capacity + * related rejection from the peer. + * @param delayProvider A {@link Duration} provider for delay purposes when retrying. + * @return A {@link ClientPeerRejectionPolicy}. + */ + public static ClientPeerRejectionPolicy ofRejectionWithRetries( + final Predicate rejectionPredicate, + final Function delayProvider) { + return new ClientPeerRejectionPolicy(rejectionPredicate, REJECT_RETRY, delayProvider); + } + + static final class PassthroughRequestDroppedException extends RequestDroppedException { + private static final long serialVersionUID = 5494523265208777384L; + private final StreamingHttpResponse response; + PassthroughRequestDroppedException(final String msg, final StreamingHttpResponse response) { + super(msg); + this.response = requireNonNull(response); + } + + StreamingHttpResponse response() { + return response; + } + } +} diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestRejectedException.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestDroppedException.java similarity index 76% rename from servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestRejectedException.java rename to servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestDroppedException.java index 685a9de4fe..b31d4bf76e 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestRejectedException.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/DelayedRetryRequestDroppedException.java @@ -15,7 +15,7 @@ */ package io.servicetalk.traffic.resilience.http; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.http.api.HttpResponseStatus; import io.servicetalk.http.netty.RetryingHttpRequesterFilter; @@ -30,7 +30,7 @@ * its up to the application to declare whether a {@link HttpResponseStatus#TOO_MANY_REQUESTS} is a safe-to-retry * response, and if so after how much {@link #delay()}. */ -public final class DelayedRetryRequestRejectedException extends RequestRejectedException +public final class DelayedRetryRequestDroppedException extends RequestDroppedException implements RetryingHttpRequesterFilter.DelayedRetry { private static final long serialVersionUID = -7933994513110803151L; @@ -41,7 +41,7 @@ public final class DelayedRetryRequestRejectedException extends RequestRejectedE * * @param delay The delay to be provided as input to a retry mechanism. */ - public DelayedRetryRequestRejectedException(final Duration delay) { + public DelayedRetryRequestDroppedException(final Duration delay) { this.delay = requireNonNull(delay); } @@ -51,7 +51,7 @@ public DelayedRetryRequestRejectedException(final Duration delay) { * @param delay The delay to be provided as input to a retry mechanism. * @param message the detail message. */ - public DelayedRetryRequestRejectedException(final Duration delay, @Nullable final String message) { + public DelayedRetryRequestDroppedException(final Duration delay, @Nullable final String message) { super(message); this.delay = requireNonNull(delay); } @@ -63,8 +63,8 @@ public DelayedRetryRequestRejectedException(final Duration delay, @Nullable fina * @param message the detail message. * @param cause of this exception. */ - public DelayedRetryRequestRejectedException(final Duration delay, - @Nullable final String message, @Nullable final Throwable cause) { + public DelayedRetryRequestDroppedException(final Duration delay, + @Nullable final String message, @Nullable final Throwable cause) { super(message, cause); this.delay = requireNonNull(delay); } @@ -75,7 +75,7 @@ public DelayedRetryRequestRejectedException(final Duration delay, * @param delay The delay to be provided as input to a retry mechanism. * @param cause of this exception. */ - public DelayedRetryRequestRejectedException(final Duration delay, @Nullable final Throwable cause) { + public DelayedRetryRequestDroppedException(final Duration delay, @Nullable final Throwable cause) { super(cause); this.delay = requireNonNull(delay); } @@ -89,9 +89,9 @@ public DelayedRetryRequestRejectedException(final Duration delay, @Nullable fina * @param enableSuppression {@code true} if suppression should be enabled. * @param writableStackTrace {@code true} if the stack trace should be writable */ - public DelayedRetryRequestRejectedException(final Duration delay, - @Nullable final String message, @Nullable final Throwable cause, - final boolean enableSuppression, final boolean writableStackTrace) { + public DelayedRetryRequestDroppedException(final Duration delay, + @Nullable final String message, @Nullable final Throwable cause, + final boolean enableSuppression, final boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); this.delay = requireNonNull(delay); } diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/PeerCapacityRejectionPolicy.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/PeerCapacityRejectionPolicy.java deleted file mode 100644 index 0c7eec6aa8..0000000000 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/PeerCapacityRejectionPolicy.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright © 2024 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.traffic.resilience.http; - -import io.servicetalk.capacity.limiter.api.CapacityLimiter; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; -import io.servicetalk.http.api.HttpResponseMetaData; -import io.servicetalk.http.api.StreamingHttpResponse; - -import java.time.Duration; -import java.util.function.Function; -import java.util.function.Predicate; - -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT_PASSTHROUGH; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT_RETRY; -import static java.time.Duration.ZERO; -import static java.util.Objects.requireNonNull; - -/** - * Policy for peer capacity rejections that allows customization of behavior (retries or pass-through). - */ -public final class PeerCapacityRejectionPolicy { - - enum Type { - REJECT, - REJECT_PASSTHROUGH, - REJECT_RETRY, - } - - private final Predicate predicate; - private final Type type; - private final Function delayProvider; - - private PeerCapacityRejectionPolicy(final Predicate predicate, - final Type type) { - this.predicate = predicate; - this.type = type; - this.delayProvider = __ -> ZERO; - } - - PeerCapacityRejectionPolicy(final Predicate predicate, - final Type type, - final Function delayProvider) { - this.predicate = predicate; - this.type = type; - this.delayProvider = delayProvider; - } - - /** - * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the - * {@link CapacityLimiter} in use, but allow the original response from the upstream to pass-through this filter. - * @param predicate The {@link Predicate} to evaluate responses. - * Returning true from this {@link Predicate} signifies that the response was capacity - * related rejection from the peer. - * @return A {@link PeerCapacityRejectionPolicy}. - */ - public static PeerCapacityRejectionPolicy ofPassthrough(final Predicate predicate) { - return new PeerCapacityRejectionPolicy(predicate, REJECT_PASSTHROUGH); - } - - /** - * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the - * {@link CapacityLimiter} in use, and translate that to en exception. - * @param rejectionPredicate The {@link Predicate} to evaluate responses. - * Returning true from this {@link Predicate} signifies that the response was capacity - * related rejection from the peer. - * @return A {@link PeerCapacityRejectionPolicy}. - */ - public static PeerCapacityRejectionPolicy ofRejection( - final Predicate rejectionPredicate) { - return new PeerCapacityRejectionPolicy(rejectionPredicate, REJECT); - } - - /** - * Evaluate responses with the given {@link Predicate} as capacity related rejections, that will affect the - * {@link CapacityLimiter} in use, and translate that to an exception that contains "delay" information useful when - * retrying it through a retrying filter. - * @param rejectionPredicate The {@link Predicate} to evaluate responses. - * Returning true from this {@link Predicate} signifies that the response was capacity - * related rejection from the peer. - * @param delayProvider A {@link Duration} provider for delay purposes when retrying. - * @return A {@link PeerCapacityRejectionPolicy}. - */ - public static PeerCapacityRejectionPolicy ofRejectionWithRetries( - final Predicate rejectionPredicate, - final Function delayProvider) { - return new PeerCapacityRejectionPolicy(rejectionPredicate, REJECT_RETRY, delayProvider); - } - - Predicate predicate() { - return predicate; - } - - Type type() { - return type; - } - - Function delayProvider() { - return delayProvider; - } - - static final class PassthroughRequestRejectedException extends RequestRejectedException { - private static final long serialVersionUID = 5494523265208777384L; - private final StreamingHttpResponse response; - PassthroughRequestRejectedException(final String msg, final StreamingHttpResponse response) { - super(msg); - this.response = requireNonNull(response); - } - - StreamingHttpResponse response() { - return response; - } - } -} diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestRejectedException.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestDroppedException.java similarity index 75% rename from servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestRejectedException.java rename to servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestDroppedException.java index 38423ca49e..9641017657 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestRejectedException.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/RetryableRequestDroppedException.java @@ -15,7 +15,7 @@ */ package io.servicetalk.traffic.resilience.http; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.transport.api.RetryableException; import javax.annotation.Nullable; @@ -26,7 +26,7 @@ * not touch the "wire" (network) yet, meaning that its safe to be retried. Retries are useful in the context of * capacity, to maximize chances for a request to succeed. */ -public final class RetryableRequestRejectedException extends RequestRejectedException +public final class RetryableRequestDroppedException extends RequestDroppedException implements RetryableException { private static final long serialVersionUID = -1968209429496611665L; @@ -34,7 +34,7 @@ public final class RetryableRequestRejectedException extends RequestRejectedExce /** * Creates a new instance. */ - public RetryableRequestRejectedException() { + public RetryableRequestDroppedException() { } /** @@ -42,7 +42,7 @@ public RetryableRequestRejectedException() { * * @param message the detail message. */ - public RetryableRequestRejectedException(@Nullable final String message) { + public RetryableRequestDroppedException(@Nullable final String message) { super(message); } @@ -52,7 +52,7 @@ public RetryableRequestRejectedException(@Nullable final String message) { * @param message the detail message. * @param cause of this exception. */ - public RetryableRequestRejectedException(@Nullable final String message, @Nullable final Throwable cause) { + public RetryableRequestDroppedException(@Nullable final String message, @Nullable final Throwable cause) { super(message, cause); } @@ -61,7 +61,7 @@ public RetryableRequestRejectedException(@Nullable final String message, @Nullab * * @param cause of this exception. */ - public RetryableRequestRejectedException(@Nullable final Throwable cause) { + public RetryableRequestDroppedException(@Nullable final Throwable cause) { super(cause); } @@ -73,8 +73,8 @@ public RetryableRequestRejectedException(@Nullable final Throwable cause) { * @param enableSuppression {@code true} if suppression should be enabled. * @param writableStackTrace {@code true} if the stack trace should be writable */ - public RetryableRequestRejectedException(@Nullable final String message, @Nullable final Throwable cause, - final boolean enableSuppression, final boolean writableStackTrace) { + public RetryableRequestDroppedException(@Nullable final String message, @Nullable final Throwable cause, + final boolean enableSuppression, final boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/SafeTrafficResiliencyObserver.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/SafeTrafficResiliencyObserver.java index 1cb49e4d1b..5b595443b9 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/SafeTrafficResiliencyObserver.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/SafeTrafficResiliencyObserver.java @@ -15,7 +15,7 @@ */ package io.servicetalk.traffic.resilience.http; -import io.servicetalk.capacity.limiter.api.CapacityLimiter; +import io.servicetalk.capacity.limiter.api.CapacityLimiter.LimiterState; import io.servicetalk.capacity.limiter.api.Classification; import io.servicetalk.context.api.ContextMap; import io.servicetalk.http.api.StreamingHttpRequest; @@ -23,8 +23,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import static io.servicetalk.traffic.resilience.http.NoOpTrafficResiliencyObserver.NO_OP_TICKET_OBSERVER; import static java.util.Objects.requireNonNull; @@ -68,7 +66,7 @@ public void onRejectedOpenCircuit(final StreamingHttpRequest request, final Stri @Override public TicketObserver onAllowedThrough(final StreamingHttpRequest request, - @Nullable final CapacityLimiter.LimiterState state) { + final LimiterState state) { try { return this.original.onAllowedThrough(request, state); } catch (Throwable t) { diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ServiceRejectionPolicy.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ServiceRejectionPolicy.java new file mode 100644 index 0000000000..2d6c292229 --- /dev/null +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/ServiceRejectionPolicy.java @@ -0,0 +1,299 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.traffic.resilience.http; + +import io.servicetalk.capacity.limiter.api.CapacityLimiter; +import io.servicetalk.capacity.limiter.api.CapacityLimiter.Ticket; +import io.servicetalk.circuit.breaker.api.CircuitBreaker; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.HttpHeaderNames; +import io.servicetalk.http.api.HttpRequestMetaData; +import io.servicetalk.http.api.HttpResponseMetaData; +import io.servicetalk.http.api.HttpServiceContext; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponseFactory; + +import java.time.Duration; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static io.servicetalk.buffer.api.CharSequences.newAsciiString; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.http.api.HttpHeaderNames.RETRY_AFTER; +import static java.lang.String.valueOf; +import static java.util.Objects.requireNonNull; + +/** + * Rejection Policy to rule the behavior of service rejections due to capacity or open circuit. + * This is meant to be used as a policy on the {@link TrafficResilienceHttpServiceFilter}. + * @see TrafficResilienceHttpServiceFilter.Builder#rejectionPolicy(ServiceRejectionPolicy) + */ +public final class ServiceRejectionPolicy { + + /** + * Custom retry-after header that supports milliseconds resolution, rather than seconds. + */ + public static final CharSequence RETRY_AFTER_MILLIS = newAsciiString("retry-after-millis"); + + /** + * Default response rejection policy. + *

    + *
  • When a request is rejected due to capacity, the service will respond + * {@link ServiceRejectionPolicy#tooManyRequests()}.
  • + *
  • When a request is rejected due to capacity, the service will NOT include a retry-after header.
  • + *
  • When a request is rejected due to breaker, the service will respond + * {@link ServiceRejectionPolicy#serviceUnavailable()}.
  • + *
  • When a request is rejected due to breaker, the service will respond with Retry-After header hinting + * the duration the breaker will remain open.
  • + *
+ */ + public static final ServiceRejectionPolicy DEFAULT_REJECTION_POLICY = + new ServiceRejectionPolicy.Builder().build(); + + private final BiFunction> + onLimitResponseBuilder; + + private final Consumer onLimitRetryAfter; + + private final boolean onLimitStopAcceptingConnections; + + private final BiFunction> + onOpenCircuitResponseBuilder; + + private final BiConsumer onOpenCircuitRetryAfter; + + private ServiceRejectionPolicy(final BiFunction> onLimitResponseBuilder, + final Consumer onLimitRetryAfter, + final boolean onLimitStopAcceptingConnections, + final BiFunction> onOpenCircuitResponseBuilder, + final BiConsumer + onOpenCircuitRetryAfter) { + this.onLimitResponseBuilder = onLimitResponseBuilder; + this.onLimitRetryAfter = onLimitRetryAfter; + this.onLimitStopAcceptingConnections = onLimitStopAcceptingConnections; + this.onOpenCircuitResponseBuilder = onOpenCircuitResponseBuilder; + this.onOpenCircuitRetryAfter = onOpenCircuitRetryAfter; + } + + BiFunction> + onLimitResponseBuilder() { + return onLimitResponseBuilder; + } + + Consumer onLimitRetryAfter() { + return onLimitRetryAfter; + } + + boolean onLimitStopAcceptingConnections() { + return onLimitStopAcceptingConnections; + } + + BiFunction> + onOpenCircuitResponseBuilder() { + return onOpenCircuitResponseBuilder; + } + + BiConsumer onOpenCircuitRetryAfter() { + return onOpenCircuitRetryAfter; + } + + /** + * A hard-coded delay in seconds to be supplied as a Retry-After HTTP header in a {@link HttpResponseMetaData}. + * + * @param seconds The value (in seconds) to be used in the Retry-After header. + * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a fixed Retry-After figure in + * seconds. + */ + public static Consumer retryAfterHint(final int seconds) { + final CharSequence secondsSeq = newAsciiString(valueOf(seconds)); + return resp -> resp.addHeader(RETRY_AFTER, secondsSeq); + } + + /** + * A delay in seconds to be supplied as a Retry-After HTTP header in a {@link HttpResponseMetaData} based on the + * {@link CircuitBreaker} that matched the {@link HttpRequestMetaData}. + * + * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a Retry-After figure in + * seconds based on the duration the matching {@link CircuitBreaker} will remain open, or a fallback period. + */ + public static BiConsumer + retryAfterHintOfBreaker() { + return (resp, state) -> resp.setHeader(RETRY_AFTER, newAsciiString(valueOf( + state.breaker().remainingDurationInOpenState().getSeconds()))); + } + + /** + * A hard-coded delay in milliseconds to be supplied as a Retry-After-Millis HTTP header in a + * {@link HttpResponseMetaData}. Being a custom Http header, it will require special handling on the peer side. + * + * @param duration The duration to be used in the Retry-After-Millis header. + * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a fixed + * Retry-After-Millis figure in milliseconds. + */ + public static BiConsumer retryAfterMillisHint(final Duration duration) { + final CharSequence millisSeq = newAsciiString(valueOf(duration.toMillis())); + return (resp, breaker) -> resp.setHeader(RETRY_AFTER_MILLIS, millisSeq); + } + + /** + * Pre-defined {@link StreamingHttpResponse response} that signals + * {@link io.servicetalk.http.api.HttpResponseStatus#TOO_MANY_REQUESTS} to the peer. + * + * @return A {@link BiFunction} that regardless the input, it will always return a + * {@link StreamingHttpResponseFactory#tooManyRequests() too-many-requests} response. + */ + public static BiFunction> + tooManyRequests() { + return (__, factory) -> succeeded(factory.tooManyRequests()); + } + + /** + * Pre-defined {@link StreamingHttpResponse response} that signals + * {@link io.servicetalk.http.api.HttpResponseStatus#SERVICE_UNAVAILABLE} to the peer. + * + * @return A {@link BiFunction} that regardless the input, it will always return a + * {@link StreamingHttpResponseFactory#serviceUnavailable() service-unavailable} response. + */ + public static BiFunction> + serviceUnavailable() { + return (__, factory) -> succeeded(factory.serviceUnavailable()); + } + + /** + * A {@link ServiceRejectionPolicy} builder to support a custom policy. + */ + public static final class Builder { + private BiFunction> + onLimitResponseBuilder = tooManyRequests(); + + private Consumer onLimitRetryAfter = __ -> { }; + + private boolean onLimitStopAcceptingConnections; + + private BiFunction> + onOpenCircuitResponseBuilder = serviceUnavailable(); + + private BiConsumer onOpenCircuitRetryAfter = + retryAfterHintOfBreaker(); + + /** + * Determines the {@link StreamingHttpResponse} when a capacity limit is met. + * + * @param onLimitResponseBuilder A factory function used to generate a {@link StreamingHttpResponse} based + * on the {@link HttpRequestMetaData request} when a {@link CapacityLimiter capacity} limit is observed. + * @return {@code this}. + */ + public Builder onLimitResponseBuilder(final BiFunction> onLimitResponseBuilder) { + this.onLimitResponseBuilder = requireNonNull(onLimitResponseBuilder); + return this; + } + + /** + * Determines a {@link HttpHeaderNames#RETRY_AFTER retry-after} header in the + * {@link StreamingHttpResponse} when a capacity limit is met. + * + * @param onLimitRetryAfter A {@link HttpResponseMetaData} consumer, that can allow response decoration with + * additional headers to hint the peer (upon capacity limits) about a possible wait-time before a + * retry could be issued. + * @return {@code this}. + */ + public Builder onLimitRetryAfter(final Consumer onLimitRetryAfter) { + this.onLimitRetryAfter = requireNonNull(onLimitRetryAfter); + return this; + } + + /** + * When a certain {@link CapacityLimiter} rejects a request due to the active limit, + * (e.g., no {@link Ticket} is returned) influence the server to also stop accepting new connections + * until the capacity is under healthy conditions again. + * This setting only works when a {@link CapacityLimiter} matches the incoming request, in cases this + * doesn't hold (see. {@link TrafficResilienceHttpServiceFilter.Builder#Builder(Supplier, boolean)} + * Builder's rejectedNotMatched argument}) this won't be effective. + *

+ * When a server socket stops accepting new connections + * (see. {@link HttpServiceContext#acceptConnections(boolean)}) due to capacity concerns, the state will be + * toggled back when the {@link Ticket ticket's} terminal callback ({@link Ticket#dropped() dropped}, + * {@link Ticket#failed(Throwable) failed}, {@link Ticket#completed() completed}, {@link Ticket#ignored() + * ignored}) returns a positive or negative value, demonstrating available capacity or not_supported + * respectively. When the returned value is {@code 0} that means no-capacity available, which will keep the + * server in the not-accepting mode. + *

+ * When enabling this feature, it's recommended for clients using this service to configure timeouts + * for their opening connection time and connection idleness time. For example, a client without + * connection-timeout or idle-timeout on the outgoing connections towards this service, won't be able to + * detect on time the connection delays. Likewise, on the server side you can configure the + * {@link io.servicetalk.transport.api.ServiceTalkSocketOptions#SO_BACKLOG server backlog} to a very small + * number or even disable it completely, to avoid holding established connections in the OS. + *

+ * Worth noting that established connections that stay in the OS backlog, usually have a First In First Out + * behavior, which depending on the size of that queue, may result in extending latencies on newer + * requests because older ones are served first. Disabling the + * {@link io.servicetalk.transport.api.ServiceTalkSocketOptions#SO_BACKLOG server backlog} will give a + * better behavior. + * + * @param stopAccepting {@code true} will allow this filter to control the connection acceptance of the + * overall server socket. + * @return {@code this}. + */ + public Builder onLimitStopAcceptingConnections(final boolean stopAccepting) { + this.onLimitStopAcceptingConnections = stopAccepting; + return this; + } + + /** + * Determines the {@link StreamingHttpResponse} when a circuit-breaker limit is met. + * + * @param onOpenCircuitResponseBuilder A factory function used to generate a {@link StreamingHttpResponse} + * based on the {@link HttpRequestMetaData request} when an open {@link CircuitBreaker breaker} is observed. + * @return {@code this}. + */ + public Builder onOpenCircuitResponseBuilder(final BiFunction> onOpenCircuitResponseBuilder) { + this.onOpenCircuitResponseBuilder = requireNonNull(onOpenCircuitResponseBuilder); + return this; + } + + /** + * Determines a {@link HttpHeaderNames#RETRY_AFTER retry-after} header in the + * {@link StreamingHttpResponse} when a capacity limit is met. + * + * @param onOpenCircuitRetryAfter A {@link HttpResponseMetaData} consumer, that can allow response + * decoration with additional headers to hint the peer (upon open breaker) about a possible wait-time + * before a retry could be issued. + * @return {@code this}. + */ + public Builder onOpenCircuitRetryAfter(final BiConsumer onOpenCircuitRetryAfter) { + this.onOpenCircuitRetryAfter = requireNonNull(onOpenCircuitRetryAfter); + return this; + } + + /** + * Return a custom {@link ServiceRejectionPolicy} based on the options of this builder. + * + * @return A custom {@link ServiceRejectionPolicy} based on the options of this builder. + */ + public ServiceRejectionPolicy build() { + return new ServiceRejectionPolicy(onLimitResponseBuilder, onLimitRetryAfter, + onLimitStopAcceptingConnections, onOpenCircuitResponseBuilder, onOpenCircuitRetryAfter); + } + } +} diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/StateContext.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/StateContext.java index 06abc12d1a..ca1e0976cc 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/StateContext.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/StateContext.java @@ -17,17 +17,13 @@ import io.servicetalk.circuit.breaker.api.CircuitBreaker; -import javax.annotation.Nullable; - /** * State information of the {@link TrafficResilienceHttpServiceFilter traffic-resilience} service filter. */ public final class StateContext { - - @Nullable private final CircuitBreaker breaker; - StateContext(@Nullable final CircuitBreaker breaker) { + StateContext(final CircuitBreaker breaker) { this.breaker = breaker; } @@ -35,7 +31,6 @@ public final class StateContext { * Returns the {@link CircuitBreaker} in-use for the currently evaluated request. * @return The {@link CircuitBreaker} in-use for the currently evaluated request. */ - @Nullable public CircuitBreaker breaker() { return breaker; } diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilter.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilter.java index f1bfed05a5..fd1130ccfa 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilter.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilter.java @@ -18,7 +18,7 @@ import io.servicetalk.capacity.limiter.api.CapacityLimiter; import io.servicetalk.capacity.limiter.api.CapacityLimiter.Ticket; import io.servicetalk.capacity.limiter.api.Classification; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.circuit.breaker.api.CircuitBreaker; import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Single; @@ -35,7 +35,7 @@ import io.servicetalk.http.api.StreamingHttpResponseFactory; import io.servicetalk.http.netty.RetryingHttpRequesterFilter; import io.servicetalk.http.utils.TimeoutHttpRequesterFilter; -import io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.PassthroughRequestRejectedException; +import io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.PassthroughRequestDroppedException; import io.servicetalk.transport.api.ServerListenContext; import java.time.Duration; @@ -48,21 +48,19 @@ import javax.annotation.Nullable; import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace; -import static io.servicetalk.http.api.HttpResponseStatus.BAD_GATEWAY; import static io.servicetalk.http.api.HttpResponseStatus.SERVICE_UNAVAILABLE; -import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT_PASSTHROUGH; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.Type.REJECT_RETRY; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.DEFAULT_PEER_REJECTION_POLICY; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT_PASSTHROUGH; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.Type.REJECT_RETRY; import static io.servicetalk.utils.internal.DurationUtils.isPositive; import static java.lang.Integer.MAX_VALUE; -import static java.time.Duration.ZERO; import static java.util.Objects.requireNonNull; /** * A {@link StreamingHttpClientFilterFactory} to enforce capacity and circuit-breaking control for a client. * Requests that are not able to acquire a capacity ticket or a circuit permit, - * will fail with a {@link RequestRejectedException}. + * will fail with a {@link RequestDroppedException}. *

*

Ordering of filters

* Ordering of the {@link TrafficResilienceHttpClientFilter capacity-filter} is important for various reasons: @@ -74,8 +72,8 @@ *
  • The traffic control filter should be ordered after a * {@link RetryingHttpRequesterFilter retry-filter} if one is used, to avail the * benefit of retrying requests that failed due to (local or remote) capacity issues. - * {@link RetryableRequestRejectedException} are safely retry-able errors, since they occur on the outgoing - * side before they even touch the network. {@link DelayedRetryRequestRejectedException} errors on the other + * {@link RetryableRequestDroppedException} are safely retry-able errors, since they occur on the outgoing + * side before they even touch the network. {@link DelayedRetryRequestDroppedException} errors on the other * side, are remote rejections, and its up to the application logic to opt-in for them to be retryable, by * configuring the relevant predicate of the {@link RetryingHttpRequesterFilter retry * -filter}
  • @@ -106,32 +104,13 @@ public final class TrafficResilienceHttpClientFilter extends AbstractTrafficManagementHttpFilter implements StreamingHttpClientFilterFactory { - private static final RequestRejectedException LOCAL_REJECTION_RETRYABLE_EXCEPTION = unknownStackTrace( - new RetryableRequestRejectedException("Local capacity rejection", null, false, true), + private static final RequestDroppedException LOCAL_REJECTION_RETRYABLE_EXCEPTION = unknownStackTrace( + new RetryableRequestDroppedException("Local capacity rejection", null, false, true), TrafficResilienceHttpClientFilter.class, "localRejection"); private static final Single RETRYABLE_LOCAL_CAPACITY_REJECTION = Single.failed(LOCAL_REJECTION_RETRYABLE_EXCEPTION); - /** - * Default rejection observer for dropped requests from an external sourced. - * see. {@link Builder#peerCapacityRejection(PeerCapacityRejectionPolicy)}. - *

    - * The default predicate matches the following HTTP response codes: - *

      - *
    • {@link HttpResponseStatus#TOO_MANY_REQUESTS}
    • - *
    • {@link HttpResponseStatus#BAD_GATEWAY}
    • - *
    • {@link HttpResponseStatus#SERVICE_UNAVAILABLE}
    • - *
    - *

    - * If a {@link CircuitBreaker} is used consider adjusting this predicate to avoid considering - * {@link HttpResponseStatus#SERVICE_UNAVAILABLE} as a capacity issue. - */ - public static final Predicate DEFAULT_CAPACITY_REJECTION_PREDICATE = metaData -> - // Some proxies are known to return BAD_GATEWAY when the upstream is unresponsive (i.e. heavy load). - metaData.status().code() == TOO_MANY_REQUESTS.code() || metaData.status().code() == BAD_GATEWAY.code() || - metaData.status().code() == SERVICE_UNAVAILABLE.code(); - /** * Default rejection observer for dropped requests from an external sourced due to service unavailability. * see. {@link Builder#peerBreakerRejection(HttpResponseMetaData, CircuitBreaker)}}. @@ -144,7 +123,7 @@ public final class TrafficResilienceHttpClientFilter extends AbstractTrafficMana public static final Predicate DEFAULT_BREAKER_REJECTION_PREDICATE = metaData -> metaData.status().code() == SERVICE_UNAVAILABLE.code(); - private final PeerCapacityRejectionPolicy peerCapacityRejectionPolicy; + private final ClientPeerRejectionPolicy clientPeerRejectionPolicy; private final boolean forceOpenCircuitOnPeerCircuitRejections; @Nullable private final Function focreOpenCircuitOnPeerCircuitRejectionsDelayProvider; @@ -157,7 +136,7 @@ private TrafficResilienceHttpClientFilter(final Supplier> circuitBreakerPartitionsSupplier, final Function classifier, - final PeerCapacityRejectionPolicy peerCapacityRejectionPolicy, + final ClientPeerRejectionPolicy clientPeerRejectionPolicy, final Predicate breakerRejectionPredicate, final Consumer onCompletion, final Consumer onCancellation, @@ -168,9 +147,9 @@ private TrafficResilienceHttpClientFilter(final Supplier request(final StreamingHttpRequester del final StreamingHttpRequest request) { return applyCapacityControl(capacityPartitions, circuitBreakerPartitions, null, request, null, delegate::request) - .onErrorResume(PassthroughRequestRejectedException.class, t -> Single.succeeded(t.response())); + .onErrorResume(PassthroughRequestDroppedException.class, t -> Single.succeeded(t.response())); } }); } @@ -199,7 +178,7 @@ protected Single request(final StreamingHttpRequester del @Override Single handleLocalBreakerRejection( StreamingHttpRequest request, @Nullable StreamingHttpResponseFactory responseFactory, - @Nullable CircuitBreaker breaker) { + CircuitBreaker breaker) { return DEFAULT_BREAKER_REJECTION; } @@ -211,17 +190,17 @@ Single handleLocalCapacityRejection( } @Override - RuntimeException peerCapacityRejection(final StreamingHttpResponse resp) { - final PeerCapacityRejectionPolicy.Type type = peerCapacityRejectionPolicy.type(); + RuntimeException peerRejection(final StreamingHttpResponse resp) { + final ClientPeerRejectionPolicy.Type type = clientPeerRejectionPolicy.type(); if (type == REJECT_RETRY) { - final Duration delay = peerCapacityRejectionPolicy.delayProvider().apply(resp); - return new DelayedRetryRequestRejectedException(delay); + final Duration delay = clientPeerRejectionPolicy.delayProvider().apply(resp); + return new DelayedRetryRequestDroppedException(delay); } else if (type == REJECT) { - return super.peerCapacityRejection(resp); + return super.peerRejection(resp); } else if (type == REJECT_PASSTHROUGH) { - return new PassthroughRequestRejectedException("Service under heavy load", resp); + return new PassthroughRequestDroppedException("Service under heavy load", resp); } else { - return new IllegalStateException("Unexpected PeerCapacityRejectionPolicy.Type: " + type); + return new IllegalStateException("Unexpected ClientPeerRejectionPolicy.Type: " + type); } } @@ -249,13 +228,12 @@ public static final class Builder { private Supplier> circuitBreakerPartitionsSupplier = () -> __ -> null; private Function classifier = __ -> () -> MAX_VALUE; - private PeerCapacityRejectionPolicy peerCapacityRejectionPolicy = - new PeerCapacityRejectionPolicy(DEFAULT_CAPACITY_REJECTION_PREDICATE, REJECT_RETRY, __ -> ZERO); + private ClientPeerRejectionPolicy clientPeerRejectionPolicy = DEFAULT_PEER_REJECTION_POLICY; private Predicate peerUnavailableRejectionPredicate = DEFAULT_BREAKER_REJECTION_PREDICATE; private final Consumer onCompletionTicketTerminal = Ticket::completed; private Consumer onCancellationTicketTerminal = Ticket::dropped; private BiConsumer onErrorTicketTerminal = (ticket, throwable) -> { - if (throwable instanceof RequestRejectedException || throwable instanceof TimeoutException) { + if (throwable instanceof RequestDroppedException || throwable instanceof TimeoutException) { ticket.dropped(); } else { ticket.failed(throwable); @@ -298,7 +276,7 @@ public Builder(final Supplier capacityLimiterSupplier) { *

    * If a {@code partitions} doesn't return a {@link CapacityLimiter} for the given {@link HttpRequestMetaData} * then the {@code rejectNotMatched} is evaluated to decide what the filter should do with this request. - * If {@code true} then the request will be {@link RequestRejectedException rejected}. + * If {@code true} then the request will be {@link RequestDroppedException rejected}. *

    * It's important that instances returned from this {@link Function mapper} are singletons and shared * across the same matched partitions. Otherwise, capacity will not be controlled as expected, and there @@ -329,7 +307,7 @@ public Builder(final Supplier> ca *

    * If a {@code partitions} doesn't return a {@link CapacityLimiter} for the given {@link HttpRequestMetaData} * then the {@code rejectNotMatched} is evaluated to decide what the filter should do with this request. - * If {@code true} then the request will be {@link RequestRejectedException rejected}. + * If {@code true} then the request will be {@link RequestDroppedException rejected}. *

    * It's important that instances returned from this {@link Function mapper} are singletons and shared * across the same matched partitions. Otherwise, capacity will not be controlled as expected, and there @@ -403,7 +381,7 @@ public Builder circuitBreakerPartitions( /** * Peers can reject and exception due to capacity reasons based on their own principals and implementation * details. A {@link TrafficResilienceHttpClientFilter} can benefit from this input as feedback for the - * {@link CapacityLimiter} in use, that the request was dropped (ie. rejected), thus it can also bring its + * {@link CapacityLimiter} in use, that the request was dropped (i.e., rejected), thus it can also bring its * local limit down to help with the overloaded peer. Since what defines a rejection/drop or request for * backpressure is not universally common, one can define what response characteristics define that state. *

    @@ -418,16 +396,17 @@ public Builder circuitBreakerPartitions( * * *

    - * Allowing retry, requests will fail with a {@link DelayedRetryRequestRejectedException} to support + * Allowing retry, requests will fail with a {@link DelayedRetryRequestDroppedException} to support * retrying mechanisms (like retry-filters or retry operators) to re-attempt the same request. * Requests that fail due to capacity limitation, are good candidates for a retry, since we anticipate they are * safe to be executed again (no previous invocation actually started) and because this maximizes the success * chances. - * @param policy The {@link PeerCapacityRejectionPolicy} that represents the peer capacity rejection behavior. + * @param policy The {@link ClientPeerRejectionPolicy} that represents the peer capacity rejection behavior. * @return {@code this}. + * @see ClientPeerRejectionPolicy#DEFAULT_PEER_REJECTION_POLICY */ - public Builder peerCapacityRejection(final PeerCapacityRejectionPolicy policy) { - this.peerCapacityRejectionPolicy = requireNonNull(policy); + public Builder rejectionPolicy(final ClientPeerRejectionPolicy policy) { + this.clientPeerRejectionPolicy = requireNonNull(policy); return this; } @@ -463,7 +442,8 @@ public Builder peerUnavailableRejectionPredicate(final Predicate * If the delay provided is not a positive value, then the {@link CircuitBreaker} will not be modified. - * + *

    + * To disable this behaviour see {@link #dontForceOpenCircuitOnPeerCircuitRejections()}. * @param delayProvider A function to provide / extract a delay in milliseconds for the * {@link CircuitBreaker} to remain open. * @param executor A {@link Executor} used to re-close the {@link CircuitBreaker} once the delay expires. @@ -482,7 +462,8 @@ public Builder forceOpenCircuitOnPeerCircuitRejections( * When a peer rejects a {@link HttpRequestMetaData request} due to an open-circuit (see. * {@link #peerUnavailableRejectionPredicate(Predicate)}), ignore feedback and leave local matching * {@link CircuitBreaker circuit-breake partition} closed. - * + *

    + * To opt-in for this behaviour see {@link #forceOpenCircuitOnPeerCircuitRejections(Function, Executor)}. * @return {@code this}. */ public Builder dontForceOpenCircuitOnPeerCircuitRejections() { @@ -495,7 +476,7 @@ public Builder dontForceOpenCircuitOnPeerCircuitRejections() { /** * {@link Ticket Ticket} terminal callback override upon erroneous completion of the request operation. * Erroneous completion in this context means, that an error occurred as part of the operation or the - * {@link #peerCapacityRejection(PeerCapacityRejectionPolicy)} triggered an exception. + * {@link #rejectionPolicy(ClientPeerRejectionPolicy)} triggered an exception. * By default the terminal callback is {@link Ticket#failed(Throwable)}. * * @param onError Callback to override default {@link Ticket ticket} terminal event for an erroneous @@ -544,7 +525,7 @@ public Builder observer(final TrafficResiliencyObserver observer) { public TrafficResilienceHttpClientFilter build() { return new TrafficResilienceHttpClientFilter(capacityPartitionsSupplier, rejectWhenNotMatchedCapacityPartition, - circuitBreakerPartitionsSupplier, classifier, peerCapacityRejectionPolicy, + circuitBreakerPartitionsSupplier, classifier, clientPeerRejectionPolicy, peerUnavailableRejectionPredicate, onCompletionTicketTerminal, onCancellationTicketTerminal, onErrorTicketTerminal, forceOpenCircuitOnPeerCircuitRejections, focreOpenCircuitOnPeerCircuitRejectionsDelayProvider, circuitBreakerResetExecutor, observer); diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilter.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilter.java index 645857409b..ae153d46d6 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilter.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilter.java @@ -18,12 +18,10 @@ import io.servicetalk.capacity.limiter.api.CapacityLimiter; import io.servicetalk.capacity.limiter.api.CapacityLimiter.Ticket; import io.servicetalk.capacity.limiter.api.Classification; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.circuit.breaker.api.CircuitBreaker; import io.servicetalk.concurrent.api.Single; -import io.servicetalk.http.api.HttpHeaderNames; import io.servicetalk.http.api.HttpRequestMetaData; -import io.servicetalk.http.api.HttpResponseMetaData; import io.servicetalk.http.api.HttpServerBuilder; import io.servicetalk.http.api.HttpServiceContext; import io.servicetalk.http.api.StreamingHttpRequest; @@ -35,25 +33,20 @@ import io.servicetalk.http.utils.TimeoutHttpServiceFilter; import io.servicetalk.transport.api.ServerListenContext; -import java.time.Duration; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; -import static io.servicetalk.buffer.api.CharSequences.newAsciiString; -import static io.servicetalk.concurrent.api.Single.succeeded; -import static io.servicetalk.http.api.HttpHeaderNames.RETRY_AFTER; +import static io.servicetalk.traffic.resilience.http.ServiceRejectionPolicy.DEFAULT_REJECTION_POLICY; import static java.lang.Integer.MAX_VALUE; -import static java.lang.String.valueOf; import static java.util.Objects.requireNonNull; /** * A {@link StreamingHttpServiceFilterFactory} to enforce capacity control for a server. - * Requests that are not able to acquire a {@link Ticket permit}, will fail with a {@link RequestRejectedException}. + * Requests that are not able to acquire a {@link Ticket permit}, will fail with a {@link RequestDroppedException}. *

    *

    Ordering of filters

    * Ordering of the {@link TrafficResilienceHttpClientFilter capacity-filter} is important for various reasons: @@ -86,7 +79,7 @@ public final class TrafficResilienceHttpServiceFilter extends AbstractTrafficManagementHttpFilter implements StreamingHttpServiceFilterFactory { - private final RejectionPolicy rejectionPolicy; + private final ServiceRejectionPolicy serviceRejectionPolicy; private TrafficResilienceHttpServiceFilter(final Supplier> capacityPartitionsSupplier, @@ -97,11 +90,11 @@ private TrafficResilienceHttpServiceFilter(final Supplier onError, final Supplier> circuitBreakerPartitionsSupplier, - final RejectionPolicy onRejectionPolicy, + final ServiceRejectionPolicy onServiceRejectionPolicy, final TrafficResiliencyObserver observer) { super(capacityPartitionsSupplier, rejectNotMatched, classifier, __ -> false, __ -> false, onCompletion, onCancellation, onError, circuitBreakerPartitionsSupplier, observer); - this.rejectionPolicy = onRejectionPolicy; + this.serviceRejectionPolicy = onServiceRejectionPolicy; } @Override @@ -137,15 +130,15 @@ Single handleLocalCapacityRejection( final StreamingHttpRequest request, @Nullable final StreamingHttpResponseFactory responseFactory) { assert serverListenContext != null; - if (rejectionPolicy.onLimitStopAcceptingConnections) { + if (serviceRejectionPolicy.onLimitStopAcceptingConnections()) { serverListenContext.acceptConnections(false); } if (responseFactory != null) { - return rejectionPolicy.onLimitResponseBuilder + return serviceRejectionPolicy.onLimitResponseBuilder() .apply(request, responseFactory) .map(resp -> { - rejectionPolicy.onLimitRetryAfter.accept(resp); + serviceRejectionPolicy.onLimitRetryAfter().accept(resp); return resp; }); } @@ -157,12 +150,12 @@ Single handleLocalCapacityRejection( Single handleLocalBreakerRejection( final StreamingHttpRequest request, @Nullable final StreamingHttpResponseFactory responseFactory, - @Nullable final CircuitBreaker breaker) { + final CircuitBreaker breaker) { if (responseFactory != null) { - return rejectionPolicy.onOpenCircuitResponseBuilder + return serviceRejectionPolicy.onOpenCircuitResponseBuilder() .apply(request, responseFactory) .map(resp -> { - rejectionPolicy.onOpenCircuitRetryAfter + serviceRejectionPolicy.onOpenCircuitRetryAfter() .accept(resp, new StateContext(breaker)); return resp; }) @@ -172,24 +165,6 @@ Single handleLocalBreakerRejection( return DEFAULT_BREAKER_REJECTION; } - /** - * Default response rejection policy. - *
      - *
    • When a request is rejected due to capacity, the service will respond - * {@link RejectionPolicy#tooManyRequests()}.
    • - *
    • When a request is rejected due to capacity, the service will NOT include a retry-after header.
    • - *
    • When a request is rejected due to breaker, the service will respond - * {@link RejectionPolicy#serviceUnavailable()}.
    • - *
    • When a request is rejected due to breaker, the service will respond with Retry-After header hinting - * the duration the breaker will remain open.
    • - *
    - * - * @return The default {@link RejectionPolicy}. - */ - public static RejectionPolicy defaultRejectionResponsePolicy() { - return new RejectionPolicy.Builder().build(); - } - /** * A {@link TrafficResilienceHttpServiceFilter} instance builder. * @@ -200,11 +175,11 @@ public static final class Builder { private Function classifier = __ -> () -> MAX_VALUE; private Supplier> circuitBreakerPartitionsSupplier = () -> __ -> null; - private RejectionPolicy onRejectionPolicy = defaultRejectionResponsePolicy(); + private ServiceRejectionPolicy onServiceRejectionPolicy = DEFAULT_REJECTION_POLICY; private final Consumer onCompletionTicketTerminal = Ticket::completed; private Consumer onCancellationTicketTerminal = Ticket::ignored; private BiConsumer onErrorTicketTerminal = (ticket, throwable) -> { - if (throwable instanceof RequestRejectedException || throwable instanceof TimeoutException) { + if (throwable instanceof RequestDroppedException || throwable instanceof TimeoutException) { ticket.dropped(); } else { ticket.failed(throwable); @@ -242,7 +217,7 @@ public Builder(Supplier capacityLimiterSupplier) { *

    * If a {@code partitions} doesn't return a {@link CapacityLimiter} for the given {@link HttpRequestMetaData} * then the {@code rejectNotMatched} is evaluated to decide what the filter should do with this request. - * If {@code true} then the request will be {@link RequestRejectedException rejected}. + * If {@code true} then the request will be {@link RequestDroppedException rejected}. *

    * It's important that instances returned from this {@link Function mapper} are singletons and shared * across the same matched partitions. Otherwise, capacity will not be controlled as expected, and there @@ -273,7 +248,7 @@ public Builder(final Supplier> ca *

    * If a {@code partitions} doesn't return a {@link CapacityLimiter} for the given {@link HttpRequestMetaData} * then the {@code rejectNotMatched} is evaluated to decide what the filter should do with this request. - * If {@code true} then the request will be {@link RequestRejectedException rejected}. + * If {@code true} then the request will be {@link RequestDroppedException rejected}. *

    * It's important that instances returned from this {@link Function mapper} are singletons and shared * across the same matched partitions. Otherwise, capacity will not be controlled as expected, and there @@ -327,7 +302,7 @@ public Builder classifier(final Function cl *

    * Once a matching {@link CircuitBreaker} transitions to open state, requests that match the same breaker * will fail (e.g., {@link io.servicetalk.http.api.HttpResponseStatus#SERVICE_UNAVAILABLE}) and - * {@link RejectionPolicy#onOpenCircuitRetryAfter} can be used to hint peers about the fact that + * {@link ServiceRejectionPolicy#onOpenCircuitRetryAfter()} can be used to hint peers about the fact that * the circuit will remain open for a certain amount of time. * * @param circuitBreakerPartitionsSupplier A {@link Supplier} to create a new {@link Function} for each new @@ -375,14 +350,15 @@ public Builder onCancelTicketTerminal(final Consumer onCancellation) { } /** - * Defines the {@link RejectionPolicy} which in turn defines the behavior of the service when a + * Defines the {@link ServiceRejectionPolicy} which in turn defines the behavior of the service when a * rejection occurs due to {@link CapacityLimiter capacity} or {@link CircuitBreaker breaker}. * * @param policy The policy to put into effect when a rejection occurs. * @return {@code this}. + * @see ServiceRejectionPolicy#DEFAULT_REJECTION_POLICY */ - public Builder onRejectionPolicy(final RejectionPolicy policy) { - this.onRejectionPolicy = requireNonNull(policy, "policy"); + public Builder rejectionPolicy(final ServiceRejectionPolicy policy) { + this.onServiceRejectionPolicy = requireNonNull(policy, "policy"); return this; } @@ -405,232 +381,7 @@ public Builder observer(final TrafficResiliencyObserver observer) { public TrafficResilienceHttpServiceFilter build() { return new TrafficResilienceHttpServiceFilter(capacityPartitionsSupplier, rejectNotMatched, classifier, onCompletionTicketTerminal, onCancellationTicketTerminal, - onErrorTicketTerminal, circuitBreakerPartitionsSupplier, onRejectionPolicy, observer); - } - } - - /** - * Policy to rule the behavior of service rejections due to capacity or open circuit. - */ - public static final class RejectionPolicy { - - /** - * Custom retry-after header that supports milliseconds resolution, rather than seconds. - */ - public static final CharSequence RETRY_AFTER_MILLIS = newAsciiString("retry-after-millis"); - - private final BiFunction> - onLimitResponseBuilder; - - private final Consumer onLimitRetryAfter; - - private final boolean onLimitStopAcceptingConnections; - - private final BiFunction> - onOpenCircuitResponseBuilder; - - private final BiConsumer onOpenCircuitRetryAfter; - - private RejectionPolicy(final BiFunction> onLimitResponseBuilder, - final Consumer onLimitRetryAfter, - final boolean onLimitStopAcceptingConnections, - final BiFunction> onOpenCircuitResponseBuilder, - final BiConsumer - onOpenCircuitRetryAfter) { - this.onLimitResponseBuilder = onLimitResponseBuilder; - this.onLimitRetryAfter = onLimitRetryAfter; - this.onLimitStopAcceptingConnections = onLimitStopAcceptingConnections; - this.onOpenCircuitResponseBuilder = onOpenCircuitResponseBuilder; - this.onOpenCircuitRetryAfter = onOpenCircuitRetryAfter; - } - - /** - * A hard-coded delay in seconds to be supplied as a Retry-After HTTP header in a {@link HttpResponseMetaData}. - * - * @param seconds The value (in seconds) to be used in the Retry-After header. - * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a fixed Retry-After figure in - * seconds. - */ - public static Consumer retryAfterHint(final int seconds) { - final CharSequence secondsSeq = newAsciiString(valueOf(seconds)); - return resp -> resp.addHeader(RETRY_AFTER, secondsSeq); - } - - /** - * A delay in seconds to be supplied as a Retry-After HTTP header in a {@link HttpResponseMetaData} based on the - * {@link CircuitBreaker} that matched the {@link HttpRequestMetaData}. - * - * @param fallbackSeconds The value (in seconds) to be used if no {@link CircuitBreaker} matched. - * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a Retry-After figure in - * seconds based on the duration the matching {@link CircuitBreaker} will remain open, or a fallback period. - */ - public static BiConsumer - retryAfterHintOfBreaker(final int fallbackSeconds) { - final CharSequence secondsSeq = newAsciiString(valueOf(fallbackSeconds)); - return (resp, state) -> { - if (state.breaker() != null || fallbackSeconds > 0) { - resp.setHeader(RETRY_AFTER, state.breaker() != null ? newAsciiString(valueOf( - state.breaker().remainingDurationInOpenState().getSeconds())) : secondsSeq); - } - }; - } - - /** - * A hard-coded delay in milliseconds to be supplied as a Retry-After-Millis HTTP header in a - * {@link HttpResponseMetaData}. Being a custom Http header, it will require special handling on the peer side. - * - * @param duration The duration to be used in the Retry-After-Millis header. - * @return A {@link HttpResponseMetaData} consumer, that enhances the headers with a fixed - * Retry-After-Millis figure in milliseconds. - */ - public static BiConsumer retryAfterMillisHint(final Duration duration) { - final CharSequence millisSeq = newAsciiString(valueOf(duration.toMillis())); - return (resp, breaker) -> resp.setHeader(RETRY_AFTER_MILLIS, millisSeq); - } - - /** - * Pre-defined {@link StreamingHttpResponse response} that signals - * {@link io.servicetalk.http.api.HttpResponseStatus#TOO_MANY_REQUESTS} to the peer. - * - * @return A {@link BiFunction} that regardless the input, it will always return a - * {@link StreamingHttpResponseFactory#tooManyRequests() too-many-requests} response. - */ - public static BiFunction> - tooManyRequests() { - return (__, factory) -> succeeded(factory.tooManyRequests()); - } - - /** - * Pre-defined {@link StreamingHttpResponse response} that signals - * {@link io.servicetalk.http.api.HttpResponseStatus#SERVICE_UNAVAILABLE} to the peer. - * - * @return A {@link BiFunction} that regardless the input, it will always return a - * {@link StreamingHttpResponseFactory#serviceUnavailable() service-unavailable} response. - */ - public static BiFunction> - serviceUnavailable() { - return (__, factory) -> succeeded(factory.serviceUnavailable()); - } - - /** - * A {@link RejectionPolicy} builder to support a custom policy. - */ - public static final class Builder { - private BiFunction> - onLimitResponseBuilder = tooManyRequests(); - - private Consumer onLimitRetryAfter = __ -> { }; - - private boolean onLimitStopAcceptingConnections; - - private BiFunction> - onOpenCircuitResponseBuilder = serviceUnavailable(); - - private BiConsumer onOpenCircuitRetryAfter = - retryAfterHintOfBreaker(-1); - - /** - * Determines the {@link StreamingHttpResponse} when a capacity limit is met. - * - * @param onLimitResponseBuilder A factory function used to generate a {@link StreamingHttpResponse} based - * on the {@link HttpRequestMetaData request} when a {@link CapacityLimiter capacity} limit is observed. - * @return {@code this}. - */ - public Builder onLimitResponseBuilder(final BiFunction> onLimitResponseBuilder) { - this.onLimitResponseBuilder = requireNonNull(onLimitResponseBuilder); - return this; - } - - /** - * Determines a {@link HttpHeaderNames#RETRY_AFTER retry-after} header in the - * {@link StreamingHttpResponse} when a capacity limit is met. - * - * @param onLimitRetryAfter A {@link HttpResponseMetaData} consumer, that can allow response decoration with - * additional headers to hint the peer (upon capacity limits) about a possible wait-time before a - * retry could be issued. - * @return {@code this}. - */ - public Builder onLimitRetryAfter(final Consumer onLimitRetryAfter) { - this.onLimitRetryAfter = requireNonNull(onLimitRetryAfter); - return this; - } - - /** - * When a certain {@link CapacityLimiter} rejects a request due to the active limit, - * (e.g., no {@link Ticket} is returned) influence the server to also stop accepting new connections - * until the capacity is under healthy conditions again. - * This setting only works when a {@link CapacityLimiter} matches the incoming request, in cases this - * doesn't hold (see. {@link TrafficResilienceHttpServiceFilter.Builder#Builder(Supplier, boolean)} - * Builder's rejectedNotMatched argument}) this won't be effective. - *

    - * When a server socket stops accepting new connections - * (see. {@link HttpServiceContext#acceptConnections(boolean)}) due to capacity concerns, the state will be - * toggled back when the {@link Ticket ticket's} terminal callback ({@link Ticket#dropped() dropped}, - * {@link Ticket#failed(Throwable) failed}, {@link Ticket#completed() completed}, {@link Ticket#ignored() - * ignored}) returns a positive or negative value, demonstrating available capacity or not_supported - * respectively. When the returned value is {@code 0} that means no-capacity available, which will keep the - * server in the not-accepting mode. - *

    - * When enabling this feature, it's recommended for clients using this service to configure timeouts - * for their opening connection time and connection idleness time. For example, a client without - * connection-timeout or idle-timeout on the outgoing connections towards this service, won't be able to - * detect on time the connection delays. Likewise, on the server side you can configure the - * {@link io.servicetalk.transport.api.ServiceTalkSocketOptions#SO_BACKLOG server backlog} to a very small - * number or even disable it completely, to avoid holding established connections in the OS. - *

    - * Worth noting that established connections that stay in the OS backlog, usually have a First In First Out - * behavior, which depending on the size of that queue, may result in extending latencies on newer - * requests because older ones are served first. Disabling the - * {@link io.servicetalk.transport.api.ServiceTalkSocketOptions#SO_BACKLOG server backlog} will give a - * better behavior. - * @param stopAccepting {@code true} will allow this filter to control the connection acceptance of the - * overall server socket. - * @return {@code this}. - */ - public Builder onLimitStopAcceptingConnections(final boolean stopAccepting) { - this.onLimitStopAcceptingConnections = stopAccepting; - return this; - } - - /** - * Determines the {@link StreamingHttpResponse} when a circuit-breaker limit is met. - * - * @param onOpenCircuitResponseBuilder A factory function used to generate a {@link StreamingHttpResponse} - * based on the {@link HttpRequestMetaData request} when an open {@link CircuitBreaker breaker} is observed. - * @return {@code this}. - */ - public Builder onOpenCircuitResponseBuilder(final BiFunction> onOpenCircuitResponseBuilder) { - this.onOpenCircuitResponseBuilder = requireNonNull(onOpenCircuitResponseBuilder); - return this; - } - - /** - * Determines a {@link HttpHeaderNames#RETRY_AFTER retry-after} header in the - * {@link StreamingHttpResponse} when a capacity limit is met. - * - * @param onOpenCircuitRetryAfter A {@link HttpResponseMetaData} consumer, that can allow response - * decoration with additional headers to hint the peer (upon open breaker) about a possible wait-time - * before a retry could be issued. - * @return {@code this}. - */ - public Builder onOpenCircuitRetryAfter(final BiConsumer onOpenCircuitRetryAfter) { - this.onOpenCircuitRetryAfter = requireNonNull(onOpenCircuitRetryAfter); - return this; - } - - /** - * Return a custom {@link RejectionPolicy} based on the options of this builder. - * @return A custom {@link RejectionPolicy} based on the options of this builder. - */ - public RejectionPolicy build() { - return new RejectionPolicy(onLimitResponseBuilder, onLimitRetryAfter, onLimitStopAcceptingConnections, - onOpenCircuitResponseBuilder, onOpenCircuitRetryAfter); - } + onErrorTicketTerminal, circuitBreakerPartitionsSupplier, onServiceRejectionPolicy, observer); } } @@ -643,7 +394,6 @@ private ServerResumptionTicketWrapper(final ServerListenContext listenContext, f this.listenContext = listenContext; } - @Nullable @Override public CapacityLimiter.LimiterState state() { return ticket.state(); diff --git a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResiliencyObserver.java b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResiliencyObserver.java index 715519e388..760c2de822 100644 --- a/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResiliencyObserver.java +++ b/servicetalk-traffic-resilience-http/src/main/java/io/servicetalk/traffic/resilience/http/TrafficResiliencyObserver.java @@ -22,8 +22,6 @@ import io.servicetalk.context.api.ContextMap; import io.servicetalk.http.api.StreamingHttpRequest; -import javax.annotation.Nullable; - /** * A {@link TrafficResilienceHttpServiceFilter} or {@link TrafficResilienceHttpClientFilter} observer. * Tracks interactions with {@link CapacityLimiter}s and/or {@link CircuitBreaker}s, and exposes a transactional @@ -94,5 +92,5 @@ void onRejectedOpenCircuit(StreamingHttpRequest request, String circuitBreaker, * @param state the {@link LimiterState} that correlates to this accepted request. * @return A {@link TicketObserver} to track the state of the allowed request. */ - TicketObserver onAllowedThrough(StreamingHttpRequest request, @Nullable LimiterState state); + TicketObserver onAllowedThrough(StreamingHttpRequest request, LimiterState state); } diff --git a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/CapacityClientServerTest.java b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/CapacityClientServerTest.java index 2a7743856b..b07b12039d 100644 --- a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/CapacityClientServerTest.java +++ b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/CapacityClientServerTest.java @@ -16,7 +16,7 @@ package io.servicetalk.traffic.resilience.http; import io.servicetalk.capacity.limiter.api.CapacityLimiter; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestSingle; import io.servicetalk.concurrent.test.internal.TestSingleSubscriber; @@ -29,7 +29,6 @@ import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpResponseFactory; import io.servicetalk.http.api.StreamingHttpServiceFilter; -import io.servicetalk.traffic.resilience.http.TrafficResilienceHttpServiceFilter.RejectionPolicy; import io.servicetalk.transport.api.HostAndPort; import io.servicetalk.transport.api.ServerContext; @@ -80,8 +79,8 @@ public Single handle(final HttpServiceContext ctx, if (!applyOnClient) { TrafficResilienceHttpServiceFilter serviceFilter = new TrafficResilienceHttpServiceFilter.Builder(limiterSupplier) - .onRejectionPolicy(new RejectionPolicy.Builder() - .onLimitResponseBuilder(RejectionPolicy.serviceUnavailable()).build()) + .rejectionPolicy(new ServiceRejectionPolicy.Builder() + .onLimitResponseBuilder(ServiceRejectionPolicy.serviceUnavailable()).build()) .build(); serverBuilder.appendServiceFilter(serviceFilter); @@ -111,8 +110,8 @@ void tearDown() throws Exception { } static Stream data() { - return Stream.of(newParam(true, () -> fixedCapacity().capacity(1).build()), - newParam(false, () -> fixedCapacity().capacity(1).build())); + return Stream.of(newParam(true, () -> fixedCapacity(1).build()), + newParam(false, () -> fixedCapacity(1).build())); } private static Arguments newParam( @@ -146,7 +145,7 @@ void overCapacity(final boolean applyOnClient, serverResponseQueue.take(); // Ensure the request reaches the server. if (applyOnClient) { - assertThrows(RequestRejectedException.class, () -> client.asBlockingClient().request(client.get("/"))); + assertThrows(RequestDroppedException.class, () -> client.asBlockingClient().request(client.get("/"))); } else { final HttpResponse response2 = client.asBlockingClient().request(client.get("/")); assertThat("Unexpected result.", response2.status(), is(SERVICE_UNAVAILABLE)); diff --git a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilterTest.java b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilterTest.java index 0622c24a41..59f0dbd60e 100644 --- a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilterTest.java +++ b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpClientFilterTest.java @@ -18,7 +18,7 @@ import io.servicetalk.capacity.limiter.api.CapacityLimiter; import io.servicetalk.capacity.limiter.api.CapacityLimiter.Ticket; import io.servicetalk.capacity.limiter.api.CapacityLimiters; -import io.servicetalk.capacity.limiter.api.RequestRejectedException; +import io.servicetalk.capacity.limiter.api.RequestDroppedException; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.test.StepVerifiers; @@ -43,8 +43,8 @@ import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static io.servicetalk.http.api.HttpResponseStatus.BAD_GATEWAY; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.ofPassthrough; -import static io.servicetalk.traffic.resilience.http.PeerCapacityRejectionPolicy.ofRejection; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.ofPassthrough; +import static io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy.ofRejection; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -68,13 +68,13 @@ class TrafficResilienceHttpClientFilterTest { void verifyPeerRetryableRejection() { final TrafficResilienceHttpClientFilter trafficResilienceHttpClientFilter = new TrafficResilienceHttpClientFilter.Builder( - () -> CapacityLimiters.fixedCapacity().capacity(1).build()).build(); + () -> CapacityLimiters.fixedCapacity(1).build()).build(); FilterableStreamingHttpClient client = mock(FilterableStreamingHttpClient.class); when(client.request(any())).thenReturn(Single.succeeded(REQ_RES_FACTORY.newResponse(BAD_GATEWAY))); final StreamingHttpClientFilter clientWithFilter = trafficResilienceHttpClientFilter.create(client); - assertThrows(DelayedRetryRequestRejectedException.class, () -> { + assertThrows(DelayedRetryRequestDroppedException.class, () -> { try { clientWithFilter.request(REQUEST).toFuture().get(); } catch (ExecutionException e) { @@ -87,8 +87,8 @@ void verifyPeerRetryableRejection() { void verifyPeerRejection() { final TrafficResilienceHttpClientFilter trafficResilienceHttpClientFilter = new TrafficResilienceHttpClientFilter.Builder( - () -> CapacityLimiters.fixedCapacity().capacity(1).build()) - .peerCapacityRejection(ofRejection(resp -> BAD_GATEWAY.equals(resp.status()))) + () -> CapacityLimiters.fixedCapacity(1).build()) + .rejectionPolicy(ofRejection(resp -> BAD_GATEWAY.equals(resp.status()))) .build(); FilterableStreamingHttpClient client = mock(FilterableStreamingHttpClient.class); @@ -99,7 +99,7 @@ void verifyPeerRejection() { .map(DEFAULT_ALLOCATOR::wrap).whenOnComplete(() -> payloadDrained.set(true))))); final StreamingHttpClientFilter clientWithFilter = trafficResilienceHttpClientFilter.create(client); - assertThrows(RequestRejectedException.class, () -> { + assertThrows(RequestDroppedException.class, () -> { try { clientWithFilter.request(REQUEST).toFuture().get(); } catch (ExecutionException e) { @@ -113,8 +113,8 @@ void verifyPeerRejection() { void verifyPeerRejectionPassthrough() throws Exception { final TrafficResilienceHttpClientFilter trafficResilienceHttpClientFilter = new TrafficResilienceHttpClientFilter.Builder( - () -> CapacityLimiters.fixedCapacity().capacity(1).build()) - .peerCapacityRejection(ofPassthrough(resp -> BAD_GATEWAY.equals(resp.status()))) + () -> CapacityLimiters.fixedCapacity(1).build()) + .rejectionPolicy(ofPassthrough(resp -> BAD_GATEWAY.equals(resp.status()))) .build(); FilterableStreamingHttpClient client = mock(FilterableStreamingHttpClient.class); diff --git a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilterTest.java b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilterTest.java index 59cb10d86b..806654dd59 100644 --- a/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilterTest.java +++ b/servicetalk-traffic-resilience-http/src/test/java/io/servicetalk/traffic/resilience/http/TrafficResilienceHttpServiceFilterTest.java @@ -35,7 +35,6 @@ import io.servicetalk.http.api.StreamingHttpServiceFilter; import io.servicetalk.http.netty.HttpClients; import io.servicetalk.http.netty.HttpServers; -import io.servicetalk.traffic.resilience.http.TrafficResilienceHttpServiceFilter.RejectionPolicy; import io.servicetalk.transport.api.ServerContext; import org.junit.jupiter.api.Test; @@ -82,7 +81,7 @@ class TrafficResilienceHttpServiceFilterTest { @Test void verifyAsyncContext() throws Exception { verifyServerFilterAsyncContextVisibility( - new TrafficResilienceHttpServiceFilter.Builder(() -> fixedCapacity().capacity(1).build()) + new TrafficResilienceHttpServiceFilter.Builder(() -> fixedCapacity(1).build()) .build()); } @@ -94,8 +93,7 @@ void verifyPeerRejectionCallbacks() throws Exception { try (ServerContext serverContext = HttpServers.forPort(0).listenAndAwait((ctx, request, responseFactory) -> succeeded(responseFactory.serviceUnavailable()))) { final TrafficResilienceHttpClientFilter trafficResilienceHttpClientFilter = - new TrafficResilienceHttpClientFilter.Builder(() -> CapacityLimiters.fixedCapacity() - .capacity(1) + new TrafficResilienceHttpClientFilter.Builder(() -> CapacityLimiters.fixedCapacity(1) .stateObserver((capacity, consumed) -> { consumption.set(consumed); latch.countDown(); @@ -150,15 +148,15 @@ enum Protocol { @ParameterizedTest @EnumSource(Protocol.class) void testStopAcceptingConnections(final Protocol protocol) throws Exception { - final CapacityLimiter limiter = fixedCapacity().capacity(1).build(); - final RejectionPolicy rejectionPolicy = new RejectionPolicy.Builder() + final CapacityLimiter limiter = fixedCapacity(1).build(); + final ServiceRejectionPolicy serviceRejectionPolicy = new ServiceRejectionPolicy.Builder() .onLimitStopAcceptingConnections(true) // Custom response to validate during assertion stage .onLimitResponseBuilder((meta, respFactory) -> Single.succeeded(respFactory.gatewayTimeout())) .build(); TrafficResilienceHttpServiceFilter filter = new TrafficResilienceHttpServiceFilter .Builder(() -> limiter) - .onRejectionPolicy(rejectionPolicy) + .rejectionPolicy(serviceRejectionPolicy) .build(); final HttpServerContext serverContext = forAddress(localAddress(0)) diff --git a/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/NumberUtils.java b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/NumberUtils.java index 27398a3514..8a49f24719 100644 --- a/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/NumberUtils.java +++ b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/NumberUtils.java @@ -39,21 +39,6 @@ public static int ensurePositive(final int value, final String name) { return value; } - /** - * Ensures the float is positive, excluding zero. - * - * @param value the float value to validate - * @param name name of the variable - * @return the passed value if all checks pass - * @throws IllegalArgumentException if the passed float is not greater than zero - */ - public static float ensurePositive(final float value, final String name) { - if (value <= 0.0) { - throw new IllegalArgumentException(name + ": " + value + " (expected > 0.0)"); - } - return value; - } - /** * Ensures the long is positive, excluding zero. * @@ -98,68 +83,4 @@ public static long ensureNonNegative(final long value, final String name) { } return value; } - - /** - * Ensures the float is greater than the min specified. - * - * @param value the float value to validate - * @param min the float min to validate against - * @param field name of the variable - * @return the passed value if all checks pass - * @throws IllegalArgumentException if the passed float doesn't meet the requirements - */ - public static float ensureGreaterThan(final float value, final float min, final String field) { - if (value <= min) { - throw new IllegalArgumentException(field + ": " + value + " (expected: > " + min + ")"); - } - return value; - } - - /** - * Ensures the float is between 0 and 1 (inclusive). - * - * @param value the float value to validate - * @param field name of the variable - * @return the passed value if all checks pass - * @throws IllegalArgumentException if the passed float doesn't meet the requirements - */ - public static float ensureBetweenZeroAndOne(final float value, final String field) { - if (value < 0.0f || value > 1.0f) { - throw new IllegalArgumentException(field + ": " + value + " (expected: 0.0f <= " + field + " <= 1.0f)"); - } - return value; - } - - /** - * Ensures the int is between the provided range (inclusive). - * - * @param value the int value to validate - * @param min the min int value to validate against (inclusive) - * @param max the max int value to validate against (inclusive) - * @param field name of the variable - * @return the passed value if all checks pass - * @throws IllegalArgumentException if the passed int doesn't meet the requirements - */ - public static int ensureRange(final int value, final int min, final int max, final String field) { - if (value < min || value > max) { - throw new IllegalArgumentException(field + ": " + value + - " (expected: " + min + " <= " + field + " <= " + max + ")"); - } - return value; - } - - /** - * Ensures the float is between 0 and 1 (exclusive). - * - * @param value the float value to validate - * @param field name of the variable - * @return the passed value if all checks pass - * @throws IllegalArgumentException if the passed float doesn't meet the requirements - */ - public static float ensureBetweenZeroAndOneExclusive(final float value, final String field) { - if (value <= 0.0f || value >= 1.0f) { - throw new IllegalArgumentException(field + ": " + value + " (expected: 0.0f < " + field + " < 1.0f)"); - } - return value; - } }