Skip to content

Commit

Permalink
Introduced geLimitWhenIncrementedReactive to Redis implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mokies committed Jul 27, 2017
1 parent 6974575 commit 82dcada
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 18 deletions.
1 change: 1 addition & 0 deletions NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ Enhancements
- Improve test coverage
- Improve performance testing
- Improve documentation
- Caching over limit reative wrapper that reduces load on backing implementation when under heavy load

4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Thu Jul 27 20:04:44 AEST 2017
#Thu Jul 27 20:27:54 AEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.2-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ public interface ReactiveRequestRateLimiter {

Mono<Boolean> overLimitWhenIncrementedReactive(String key, int weight);

Mono<Boolean> geLimitWhenIncrementedReactive(String key);

Mono<Boolean> geLimitWhenIncrementedReactive(String key, int weight);

Mono<Boolean> resetLimitReactive(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
-- Credit: http://www.dr-josiah.com/2014/11/introduction-to-rate-limiting-with_26.html

local limits = cjson.decode(ARGV[1])
local now = tonumber(ARGV[2])
local weight = tonumber(ARGV[3] or '1')
local strictly_greater = tonumber(ARGV[4] or '1') == 1
local longest_duration = limits[1][1] or 0
local saved_keys = {}
local ge_limit = '0'

-- handle cleanup and limit checks
for i, limit in ipairs(limits) do
local duration = limit[1]
longest_duration = math.max(longest_duration, duration)
local precision = limit[3] or duration
precision = math.min(precision, duration)
local blocks = math.ceil(duration / precision)
local saved = {}
table.insert(saved_keys, saved)
saved.block_id = math.floor(now / precision)
saved.trim_before = saved.block_id - blocks + 1
saved.count_key = duration .. ':' .. precision .. ':'
saved.ts_key = saved.count_key .. 'o'

for j, key in ipairs(KEYS) do
local old_ts = redis.call('HGET', key, saved.ts_key)
old_ts = old_ts and tonumber(old_ts) or saved.trim_before
if old_ts > now then
-- don't write in the past
return '1'
end
-- discover what needs to be cleaned up
local decr = 0
local dele = {}
local trim = math.min(saved.trim_before, old_ts + blocks)
for old_block = old_ts, trim - 1 do
local bkey = saved.count_key .. old_block
local bcount = redis.call('HGET', key, bkey)
if bcount then
decr = decr + tonumber(bcount)
table.insert(dele, bkey)
end
end
-- handle cleanup
local cur
if #dele > 0 then
redis.call('HDEL', key, unpack(dele))
-- Guard against "-0" => "ERR value is not an integer or out of range"
if decr ~= 0 then
cur = redis.call('HINCRBY', key, saved.count_key, -decr)
end
else
cur = redis.call('HGET', key, saved.count_key)
end
-- check our limits
local count = tonumber(cur or '0') + weight
if count > limit[2] then
return '1' -- over limit, don't record request
elseif count == limit[2] and not strictly_greater then
ge_limit = '1' -- at limit, do record request
end
end
end

-- there is enough resources, update the counts IFF needed
for i, limit in ipairs(limits) do
local saved = saved_keys[i]
for j, key in ipairs(KEYS) do
-- update the current timestamp, count, and bucket count
redis.call('HSET', key, saved.ts_key, saved.trim_before)
redis.call('HINCRBY', key, saved.count_key, weight)
redis.call('HINCRBY', key, saved.count_key .. saved.block_id, weight)
end
end

-- We calculated the longest-duration limit so we can EXPIRE
-- the whole HASH for quick and easy idle-time cleanup :)
if longest_duration > 0 then
for _, key in ipairs(KEYS) do
redis.call('EXPIRE', key, longest_duration)
end
end
return ge_limit
17 changes: 17 additions & 0 deletions ratelimitj-redis/out/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<logger name="es.moki" level="debug"
additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<root level="warn">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ public boolean resetLimit(String key) {
return toBlocking(resetLimitAsync(key));
}

private boolean toBlocking(CompletionStage<Boolean> completionStage) {
try {
return completionStage.toCompletableFuture().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to complete operation", e);
}
}

@Override
public Mono<Boolean> overLimitWhenIncrementedReactive(String key) {
return Mono.fromFuture(overLimitAsync(key).toCompletableFuture());
Expand All @@ -115,6 +107,16 @@ public Mono<Boolean> overLimitWhenIncrementedReactive(String key, int weight) {
return Mono.fromFuture(overLimitAsync(key, weight).toCompletableFuture());
}

@Override
public Mono<Boolean> geLimitWhenIncrementedReactive(String key) {
return Mono.fromFuture(eqOrGeLimitAsync(key, 1, false).toCompletableFuture());
}

@Override
public Mono<Boolean> geLimitWhenIncrementedReactive(String key, int weight) {
return Mono.fromFuture(eqOrGeLimitAsync(key, weight, false).toCompletableFuture());
}

@Override
public Mono<Boolean> resetLimitReactive(String key) {
return Mono.fromFuture(resetLimitAsync(key).toCompletableFuture());
Expand All @@ -137,4 +139,12 @@ private CompletionStage<Boolean> eqOrGeLimitAsync(String key, int weight, boolea

// TODO handle scenario where script is not loaded, flush scripts and test scenario
}

private boolean toBlocking(CompletionStage<Boolean> completionStage) {
try {
return completionStage.toCompletableFuture().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to complete operation", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package es.moki.ratelimitj.test.limiter.request;

import com.google.common.collect.ImmutableSet;
import es.moki.ratelimitj.core.limiter.request.RequestLimitRule;
import es.moki.ratelimitj.core.limiter.request.ReactiveRequestRateLimiter;
import es.moki.ratelimitj.test.time.TimeBanditSupplier;
import es.moki.ratelimitj.core.limiter.request.RequestLimitRule;
import es.moki.ratelimitj.core.time.TimeSupplier;
import es.moki.ratelimitj.test.time.TimeBanditSupplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

Expand All @@ -20,7 +20,7 @@ public abstract class AbstractReactiveRequestRateLimiterTest {
protected abstract ReactiveRequestRateLimiter getRateLimiter(Set<RequestLimitRule> rules, TimeSupplier timeSupplier);

@Test
public void shouldLimitSingleWindowSync() throws Exception {
public void shouldLimitSingleWindowReactive() throws Exception {

ImmutableSet<RequestLimitRule> rules = ImmutableSet.of(RequestLimitRule.of(10, TimeUnit.SECONDS, 5));
ReactiveRequestRateLimiter rateLimiter = getRateLimiter(rules, timeBandit);
Expand All @@ -39,7 +39,26 @@ public void shouldLimitSingleWindowSync() throws Exception {
}

@Test
public void shouldLimitDualWindowAsync() throws Exception {
public void shouldGeLimitSingleWindowReactive() throws Exception {

ImmutableSet<RequestLimitRule> rules = ImmutableSet.of(RequestLimitRule.of(10, TimeUnit.SECONDS, 5));
ReactiveRequestRateLimiter rateLimiter = getRateLimiter(rules, timeBandit);

Flux<Boolean> geLimtLimitFlux = Flux
.just("ip:127.0.1.2")
.repeat(4)
.flatMap(key -> {
timeBandit.addUnixTimeMilliSeconds(100);
return rateLimiter.overLimitWhenIncrementedReactive(key);
});

geLimtLimitFlux.toStream().forEach(result -> assertThat(result).isFalse());

assertThat(rateLimiter.overLimitWhenIncrementedReactive("ip:127.0.1.2").block()).isTrue();
}

@Test
public void shouldLimitDualWindowReactive() throws Exception {

ImmutableSet<RequestLimitRule> rules = ImmutableSet.of(RequestLimitRule.of(1, TimeUnit.SECONDS, 5), RequestLimitRule.of(10, TimeUnit.SECONDS, 10));
ReactiveRequestRateLimiter rateLimiter = getRateLimiter(rules, timeBandit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ public void shouldLimitSingleWindowSync() throws Exception {
assertThat(requestRateLimiter.overLimitWhenIncremented("ip:127.0.1.1")).isFalse();
});

// assertThat(requestRateLimiter.isOverLimit("ip:127.0.1.1")).isFalse();
assertThat(requestRateLimiter.overLimitWhenIncremented("ip:127.0.1.1")).isTrue();
// assertThat(requestRateLimiter.isOverLimit("ip:127.0.1.1")).isTrue();
}

@Test
Expand All @@ -47,9 +45,7 @@ public void shouldGeLimitSingleWindowSync() throws Exception {
assertThat(requestRateLimiter.geLimitWhenIncremented("ip:127.0.1.2")).isFalse();
});

// assertThat(requestRateLimiter.isGeLimit("ip:127.0.1.1")).isFalse();
assertThat(requestRateLimiter.geLimitWhenIncremented("ip:127.0.1.2")).isTrue();
// assertThat(requestRateLimiter.isGeLimit("ip:127.0.1.1")).isTrue();
}

@Test
Expand Down

0 comments on commit 82dcada

Please sign in to comment.