Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limiter #278

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change Map value to be a Future itself, use System.nanoTime
  • Loading branch information
andrew4699 committed Sep 18, 2024
commit 6a22321ec774f7f489421c84e3b5767c2704f5ac
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@

/** An implementation of our Clock interface using opentelemetry's Clock implementation */
public class ClockImpl implements Clock {
private final io.opentelemetry.sdk.common.Clock openTelemetryClock;

public ClockImpl() {
openTelemetryClock = io.opentelemetry.sdk.common.Clock.getDefault();
}

@Override
public long nanoTime() {
return openTelemetryClock.nanoTime();
return System.nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class DefaultRateLimiterFactory implements RateLimiterFactory {
private double windowSeconds;

@Override
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key) {
return CompletableFuture.supplyAsync(
andrew4699 marked this conversation as resolved.
Show resolved Hide resolved
() ->
new OpenTelemetryRateLimiter(
requestsPerSecond, requestsPerSecond * windowSeconds, clock));
requestsPerSecond, requestsPerSecond * windowSeconds, new ClockImpl()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@JsonTypeName("no-op")
public class NoOpRateLimiterFactory implements RateLimiterFactory {
@Override
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key) {
return CompletableFuture.supplyAsync(NoOpRateLimiter::new);
andrew4699 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public interface RateLimiterFactory extends Discoverable {
*
* @param key The rate limiting key. Rate limiters may optionally choose to discriminate their
* behavior by the key.
* @param clock The clock which tells you the current time
* @return a Future with the constructed RateLimiter
*/
Future<RateLimiter> createRateLimiter(String key, Clock clock);
Future<RateLimiter> createRateLimiter(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.polaris.core.context.CallContext;
Expand All @@ -47,7 +48,7 @@ public class RateLimiterFilter implements Filter {
private static final Clock CLOCK = new ClockImpl();

private final RateLimiterConfig config;
private final Map<String, RateLimiter> perRealmLimiters = new ConcurrentHashMap<>();
private final Map<String, Future<RateLimiter>> perRealmLimiters = new ConcurrentHashMap<>();

public RateLimiterFilter(RateLimiterConfig config) {
this.config = config;
Expand All @@ -67,16 +68,10 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
}

private RateLimiter maybeBlockToGetRateLimiter(String realm) {
return perRealmLimiters.computeIfAbsent(realm, this::createRateLimiterBlocking);
}

/** Creates a rate limiter, enforcing a timeout on how long creation can take. */
private RateLimiter createRateLimiterBlocking(String key) {
try {
return config
.getRateLimiterFactory()
.createRateLimiter(key, CLOCK)
.get(config.getConstructionTimeoutMillis(), TimeUnit.MILLISECONDS);
return perRealmLimiters.computeIfAbsent(realm, (key) -> config
.getRateLimiterFactory()
.createRateLimiter(key)).get(config.getConstructionTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return getDefaultRateLimiterOnConstructionFailed(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
#

org.apache.polaris.service.ratelimiter.DefaultRateLimiterFactory
org.apache.polaris.service.ratelimiter.NoOpRateLimiterFactory
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class MockRateLimiterFactory implements RateLimiterFactory {
public boolean neverFinishConstruction;

@Override
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key) {
if (neverFinishConstruction) {
// This future will never finish
return new CompletableFuture<>();
Expand Down