From 35071ab6ec3c906c0f26d153e743770e8df86079 Mon Sep 17 00:00:00 2001 From: Paras Malhotra Date: Tue, 9 Mar 2021 12:50:29 +0530 Subject: [PATCH] Add ThrottlesExceptionsWithRedis job middleware --- .../ThrottlesExceptionsWithRedis.php | 62 +++++++ .../Redis/Limiters/DurationLimiter.php | 54 ++++++ .../ThrottlesExceptionsWithRedisTest.php | 167 ++++++++++++++++++ 3 files changed, 283 insertions(+) create mode 100644 src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php create mode 100644 tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php diff --git a/src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php b/src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php new file mode 100644 index 000000000000..38790e353e2d --- /dev/null +++ b/src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php @@ -0,0 +1,62 @@ +redis = Container::getInstance()->make(Redis::class); + + $this->limiter = new DurationLimiter( + $this->redis, $this->getKey($job), $this->maxAttempts, $this->decayMinutes * 60 + ); + + if ($this->limiter->tooManyAttempts()) { + return $job->release($this->limiter->decaysAt - $this->currentTime()); + } + + try { + $next($job); + + $this->limiter->clear(); + } catch (Throwable $throwable) { + if ($this->whenCallback && ! call_user_func($this->whenCallback, $throwable)) { + throw $throwable; + } + + $this->limiter->acquire(); + + return $job->release($this->retryAfterMinutes * 60); + } + } +} diff --git a/src/Illuminate/Redis/Limiters/DurationLimiter.php b/src/Illuminate/Redis/Limiters/DurationLimiter.php index 9aa594fb41f4..d4e503ada980 100644 --- a/src/Illuminate/Redis/Limiters/DurationLimiter.php +++ b/src/Illuminate/Redis/Limiters/DurationLimiter.php @@ -111,6 +111,30 @@ public function acquire() return (bool) $results[0]; } + /** + * Determine if the key has been "accessed" too many times. + * + * @return bool + */ + public function tooManyAttempts() + { + [$this->decaysAt, $this->remaining] = $this->redis->eval( + $this->tooManyAttemptsScript(), 1, $this->name, microtime(true), time(), $this->decay, $this->maxLocks + ); + + return $this->remaining <= 0; + } + + /** + * Clear the limiter. + * + * @return void + */ + public function clear() + { + $this->redis->del($this->name); + } + /** * Get the Lua script for acquiring a lock. * @@ -143,6 +167,36 @@ protected function luaScript() end return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1} +LUA; + } + + /** + * Get the Lua script to determine if the key has been "accessed" too many times. + * + * KEYS[1] - The limiter name + * ARGV[1] - Current time in microseconds + * ARGV[2] - Current time in seconds + * ARGV[3] - Duration of the bucket + * ARGV[4] - Allowed number of tasks + * + * @return string + */ + protected function tooManyAttemptsScript() + { + return <<<'LUA' + +if redis.call('EXISTS', KEYS[1]) == 0 then + return {0, ARGV[2] + ARGV[3]} +end + +if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then + return { + redis.call('HGET', KEYS[1], 'end'), + ARGV[4] - redis.call('HGET', KEYS[1], 'count') + } +end + +return {0, ARGV[2] + ARGV[3]} LUA; } } diff --git a/tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php b/tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php new file mode 100644 index 000000000000..35d9255eb751 --- /dev/null +++ b/tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php @@ -0,0 +1,167 @@ +setUpRedis(); + } + + protected function tearDown(): void + { + parent::tearDown(); + + $this->tearDownRedis(); + + m::close(); + } + + public function testCircuitIsOpenedForJobErrors() + { + $this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random()); + $this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key); + $this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key); + } + + public function testCircuitStaysClosedForSuccessfulJobs() + { + $this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key = Str::random()); + $this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key); + $this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key); + } + + public function testCircuitResetsAfterSuccess() + { + $this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random()); + $this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key); + $this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key); + $this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key); + $this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key); + } + + protected function assertJobWasReleasedImmediately($class, $key) + { + $class::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('release')->with(0)->once(); + $job->shouldReceive('isReleased')->andReturn(true); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); + + $instance->call($job, [ + 'command' => serialize($command = new $class($key)), + ]); + + $this->assertTrue($class::$handled); + } + + protected function assertJobWasReleasedWithDelay($class, $key) + { + $class::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('release')->withArgs(function ($delay) { + return $delay >= 600; + })->once(); + $job->shouldReceive('isReleased')->andReturn(true); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); + + $instance->call($job, [ + 'command' => serialize($command = new $class($key)), + ]); + + $this->assertFalse($class::$handled); + } + + protected function assertJobRanSuccessfully($class, $key) + { + $class::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('isReleased')->andReturn(false); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); + $job->shouldReceive('delete')->once(); + + $instance->call($job, [ + 'command' => serialize($command = new $class($key)), + ]); + + $this->assertTrue($class::$handled); + } +} + +class CircuitBreakerWithRedisTestJob +{ + use InteractsWithQueue, Queueable; + + public static $handled = false; + + public function __construct($key) + { + $this->key = $key; + } + + public function handle() + { + static::$handled = true; + + throw new Exception; + } + + public function middleware() + { + return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)]; + } +} + +class CircuitBreakerWithRedisSuccessfulJob +{ + use InteractsWithQueue, Queueable; + + public static $handled = false; + + public function __construct($key) + { + $this->key = $key; + } + + public function handle() + { + static::$handled = true; + } + + public function middleware() + { + return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)]; + } +}