Skip to content

Commit

Permalink
Remove boolean flags for error recovery and shutting down and replace…
Browse files Browse the repository at this point in the history
… with a State object. This will be replaced by a ENUM in future versions
  • Loading branch information
lucasnetau committed Dec 7, 2023
1 parent ed640dd commit d0d51af
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/Management/Actions/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public function __invoke(ServerRequestInterface $request): Response
<body>
<h1>PHP Event Engine</h1>
<dl>
<dt>State</dt>
<dd>{$this->fn(ucfirst($state['scheduler']['state']))}</dd>
<dt>Command</dt>
<dd>{$this->fn(\cli_get_process_title() ?: implode(" ", $argv))}</dd>
<dt>Running Mode</dt>
Expand Down
45 changes: 23 additions & 22 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
use EdgeTelemetrics\EventCorrelation\SaveHandler\FileAdapter;
use EdgeTelemetrics\EventCorrelation\SaveHandler\SaveHandlerInterface;
use EdgeTelemetrics\EventCorrelation\Scheduler\ClosureActionWrapper;
use EdgeTelemetrics\EventCorrelation\Scheduler\FunctionSourceWrapper;
use EdgeTelemetrics\EventCorrelation\Scheduler\SourceFunction;
use EdgeTelemetrics\EventCorrelation\Scheduler\State;
use EdgeTelemetrics\EventCorrelation\StateMachine\IEventMatcher;
use Exception;
use Psr\Log\LoggerAwareInterface;
Expand Down Expand Up @@ -85,6 +85,9 @@ class Scheduler implements LoggerAwareInterface {
/** @var float hrtime for when the scheduler is started */
protected float $schedulerStartTime;

/** @var State The current state of the scheduler */
protected State $state;

/**
* @var LoopInterface
*/
Expand Down Expand Up @@ -245,14 +248,6 @@ class Scheduler implements LoggerAwareInterface {
/** @var int Counter of how many times we have hit the memory HIGH WATERMARK during execution, a high number suggested that memory resources or limit should be increased */
protected int $pausedOnMemoryPressureCount = 0;

/**
* @var bool Flag if we are replaying failed actions
*/
protected bool $errorRecovery = false;

/** @var bool Flag to track if we are shutting down the engine */
protected bool $shuttingDown = false;

/** @var TimerInterface|null */
protected ?TimerInterface $shutdownTimer = null;

Expand All @@ -275,6 +270,7 @@ class Scheduler implements LoggerAwareInterface {
*/
public function __construct(array $rules)
{
$this->state = new State(State::STARTING);
$this->loop = Loop::get();
$this->schedulerStartTime = hrtime(true);
$this->rules = $rules;
Expand Down Expand Up @@ -312,6 +308,7 @@ public function initialise_input_processes() : void {
}
}
}
$this->state = new State(State::RUNNING);
}

public function setup_source_function(int|string $id): void
Expand All @@ -334,7 +331,7 @@ public function setup_source_function(int|string $id): void
$this->logger->log($level,"Input Process {id} exited with code: {code}", ['id' => $id, 'code' => $code,]);

/** Restart the input process if it exits with an error code */
if (false === $this->shuttingDown && $code === 0 && $this->input_processes_config[$id]['essential']) {
if (!$this->state->isStopping() && $code === 0 && $this->input_processes_config[$id]['essential']) {
$this->logger->info("Essential input $id has stopped cleanly. Shutting down");
$this->shutdown();
return;
Expand Down Expand Up @@ -448,7 +445,7 @@ public function setup_input_process(Process $process, int|string $id): void
* Input processes STDOUT has closed, if we are not in the process of shutting down then we need to terminate the process since it can no longer communicate with us
*/
$process->stdout->on('close', function() use ($id, $process) {
if (!$this->shuttingDown && $process->isRunning()) {
if (!$this->state->isStopping() && $process->isRunning()) {
$this->logger->critical("{id} STDOUT closed unexpectedly, terminating process", ['id' => $id,]);
$process->terminate(SIGTERM);
}
Expand Down Expand Up @@ -480,7 +477,7 @@ public function setup_input_process(Process $process, int|string $id): void
$this->logger->critical("Input process $id exit was due to fatal PHP error");
}
/** Restart the input process if it exits with an error code EXCEPT if it is exit code 127 - Command Not Found */
if (false === $this->shuttingDown) {
if (!$this->state->isStopping()) {
if (!in_array($code, [0, 127], true)) {
$this->logger->debug("Restarting process $id");
$this->start_input_process($id);
Expand Down Expand Up @@ -571,7 +568,7 @@ public function start_action(string $actionName): Process

$this->logger->error($rpc->getError()->getMessage() . " : " . json_encode($rpc->getError()->getData()));

if ($this->errorRecovery === true) {
if ($this->state->state() === State::RECOVERY) {
$this->logger->critical('An action process failed again during recovery');
$this->shutdown();
}
Expand All @@ -580,9 +577,9 @@ public function start_action(string $actionName): Process
if (count($this->inflightActionCommands) === 0) {
$this->inflightActionCommands = [];

if ($this->errorRecovery === true && count($this->erroredActionCommands) === 0) {
if ($this->state->state() === State::RECOVERY && count($this->erroredActionCommands) === 0) {
$this->logger->info('Replay of errored actions completed successfully. Resuming normal operations');
$this->errorRecovery = false;
$this->state = new State(State::STARTING);
$this->initialise_input_processes();
}
}
Expand Down Expand Up @@ -626,7 +623,7 @@ public function start_action(string $actionName): Process
}
}

if ($this->errorRecovery === true) {
if ($this->state->state() === State::RECOVERY) {
$this->logger->critical('An action process failed again during recovery');
$this->shutdown();
}
Expand All @@ -640,7 +637,7 @@ public function start_action(string $actionName): Process
*/
$this->runningActions = [];

if (true === $this->shuttingDown && count($this->input_processes) === 0) {
if ($this->state->isStopping() && count($this->input_processes) === 0) {
/** If we are shutting down and all input processes have stopped then continue the shutdown process straight away instead of waiting for the timers */
$this->exit();
}
Expand Down Expand Up @@ -674,7 +671,7 @@ public function setup_save_state() : void
*/
$this->saveStateHandler->on('save:failed', function($exception) {
$this->dirty = true;
if ($this->shuttingDown === false) { //Failure is expected if the save handler is running when the scheduler starts shutting down. A sync save state will be run at end of shutdown
if (!$this->state->isStopping()) { //Failure is expected if the save handler is running when the scheduler starts shutting down. A sync save state will be run at end of shutdown
$this->logger->critical("Save state async failed.", ['exception' => $exception]);
}
});
Expand Down Expand Up @@ -753,7 +750,7 @@ protected function scheduleNextTimeout() : void
}

//Do not schedule any timeout if we are in the process of shutting down
if (true === $this->shuttingDown) {
if ($this->state->isStopping()) {
return;
}

Expand Down Expand Up @@ -913,7 +910,7 @@ public function run() : void
/** If we have any errored actions then we replay them and attempt recovery. In normal state we initialise the input processes */
if (count($this->erroredActionCommands)) {
$this->logger->notice('Beginning failed action recovery process');
$this->errorRecovery = true;
$this->state = new State(State::RECOVERY);
while(count($this->erroredActionCommands) > 0) {
$errored = array_shift($this->erroredActionCommands);
$action = new Action($errored['action']['cmd'], $errored['action']['vars']);
Expand Down Expand Up @@ -980,7 +977,7 @@ public function run() : void
* Initialise shutdown by stopping processes and timers
*/
public function shutdown() : void {
$this->shuttingDown = true;
$this->state = new State(State::STOPPING);
if (null !== $this->nextTimer) {
$this->loop->cancelTimer($this->nextTimer);
$this->nextTimer = null;
Expand Down Expand Up @@ -1064,10 +1061,13 @@ public function exit() : void {
$this->shutdownTimer = null;
}
$this->loop->futureTick(function() {
$this->loop->stop();
if (count($this->inflightActionCommands) > 0 ) {
$this->logger->error("There were still inflight action commands at shutdown.");
$this->state = new State(State::STOPPED_UNCLEAN);
} else {
$this->state = new State(State::STOPPED);
}
$this->loop->stop();
$this->logger->debug("Event Loop stopped");
if (isset($this->saveStateHandler)) {
$this->saveStateHandler->saveStateSync($this->buildState()); //Loop is stopped. Do a blocking synchronous save of current state prior to exit.
Expand All @@ -1082,6 +1082,7 @@ public function exit() : void {
protected function getState() : array
{
$state = [];
$state['state'] = $this->state->state();
$state['uptime_msec'] = (int)round((hrtime(true) - $this->schedulerStartTime)/1e+6);
$state['input']['running'] = array_keys($this->input_processes);
$state['input']['checkpoints'] = $this->input_processes_checkpoints;
Expand Down
49 changes: 49 additions & 0 deletions src/Scheduler/State.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php declare(strict_types=1);

/*
* This file is part of the PHP Event Correlation package.
*
* (c) James Lucas <james@lucas.net.au>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace EdgeTelemetrics\EventCorrelation\Scheduler;

//enum State implements \BackedEnum {

class State {

public const STARTING = 'starting';
public const RECOVERY = 'recovery';
public const RUNNING = 'running';
public const STOPPING = 'stopping';

public const STOPPED = 'stopped';

public const STOPPED_UNCLEAN = 'unclean shutdown';

public const VALID_STATES = [self::STARTING, self::RECOVERY, self::RUNNING, self::STOPPING, self::STOPPED, self::STOPPED_UNCLEAN];

private string $state;

public function __construct(string $state) {
if (in_array($state, self::VALID_STATES, true)) {
$this->state = $state;
} else {
throw new \RuntimeException('Invalid state ' . $state);
}
}

public function state() : string {
return $this->state;
}

public function isStopping() : bool {
return match($this->state) {
self::STOPPING => true,
default => false
};
}
}

0 comments on commit d0d51af

Please sign in to comment.