Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Add ThrottlesExceptionsWithRedis job middleware #36518

Merged
merged 1 commit into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Redis\Limiters\DurationLimiter;
use Illuminate\Support\InteractsWithTime;
use Throwable;

class ThrottlesExceptionsWithRedis extends ThrottlesExceptions
{
use InteractsWithTime;

/**
* The Redis factory implementation.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;

/**
* The rate limiter instance.
*
* @var \Illuminate\Redis\Limiters\DurationLimiter
*/
protected $limiter;

/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
$this->redis = Container::getInstance()->make(Redis::class);

$this->limiter = new DurationLimiter(
$this->redis, $this->getKey($job), $this->maxAttempts, $this->decayMinutes * 60
);

if ($this->limiter->tooManyAttempts()) {
return $job->release($this->limiter->decaysAt - $this->currentTime());
}

try {
$next($job);

$this->limiter->clear();
} catch (Throwable $throwable) {
if ($this->whenCallback && ! call_user_func($this->whenCallback, $throwable)) {
throw $throwable;
}

$this->limiter->acquire();

return $job->release($this->retryAfterMinutes * 60);
}
}
}
54 changes: 54 additions & 0 deletions src/Illuminate/Redis/Limiters/DurationLimiter.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ public function acquire()
return (bool) $results[0];
}

/**
* Determine if the key has been "accessed" too many times.
*
* @return bool
*/
public function tooManyAttempts()
{
[$this->decaysAt, $this->remaining] = $this->redis->eval(
$this->tooManyAttemptsScript(), 1, $this->name, microtime(true), time(), $this->decay, $this->maxLocks
);

return $this->remaining <= 0;
}

/**
* Clear the limiter.
*
* @return void
*/
public function clear()
{
$this->redis->del($this->name);
}

/**
* Get the Lua script for acquiring a lock.
*
Expand Down Expand Up @@ -143,6 +167,36 @@ protected function luaScript()
end

return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
LUA;
}

/**
* Get the Lua script to determine if the key has been "accessed" too many times.
*
* KEYS[1] - The limiter name
* ARGV[1] - Current time in microseconds
* ARGV[2] - Current time in seconds
* ARGV[3] - Duration of the bucket
* ARGV[4] - Allowed number of tasks
*
* @return string
*/
protected function tooManyAttemptsScript()
{
return <<<'LUA'

if redis.call('EXISTS', KEYS[1]) == 0 then
return {0, ARGV[2] + ARGV[3]}
end

if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then
return {
redis.call('HGET', KEYS[1], 'end'),
ARGV[4] - redis.call('HGET', KEYS[1], 'count')
}
end

return {0, ARGV[2] + ARGV[3]}
LUA;
}
}
167 changes: 167 additions & 0 deletions tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Exception;
use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
use Illuminate\Support\Str;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class ThrottlesExceptionsWithRedisTest extends TestCase
{
use InteractsWithRedis;

protected function setUp(): void
{
parent::setUp();

$this->setUpRedis();
}

protected function tearDown(): void
{
parent::tearDown();

$this->tearDownRedis();

m::close();
}

public function testCircuitIsOpenedForJobErrors()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random());
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key);
}

public function testCircuitStaysClosedForSuccessfulJobs()
{
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key = Str::random());
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
}

public function testCircuitResetsAfterSuccess()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random());
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key);
}

protected function assertJobWasReleasedImmediately($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->with(0)->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertTrue($class::$handled);
}

protected function assertJobWasReleasedWithDelay($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->withArgs(function ($delay) {
return $delay >= 600;
})->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertFalse($class::$handled);
}

protected function assertJobRanSuccessfully($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertTrue($class::$handled);
}
}

class CircuitBreakerWithRedisTestJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function __construct($key)
{
$this->key = $key;
}

public function handle()
{
static::$handled = true;

throw new Exception;
}

public function middleware()
{
return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)];
}
}

class CircuitBreakerWithRedisSuccessfulJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function __construct($key)
{
$this->key = $key;
}

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)];
}
}