Skip to content

Commit

Permalink
Support defining code based input sources and actions.
Browse files Browse the repository at this point in the history
 * Input Source functions are defined via extending the SourceFunction class
 * Action functions

 @note: There is no protection against blocking. These functions are more useful for testing engines or simple input/action processing.

Additional Changes:
 * Loop is initialised in the Scheduler constructors instead of delaying till run()
 * Input and Action processes can be defined using an array of parameters rather than an escaped string
  • Loading branch information
lucasnetau committed Jun 29, 2023
1 parent ac01dee commit 41b7f5b
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 32 deletions.
69 changes: 69 additions & 0 deletions examples/quickstart/function_based.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php declare(strict_types=1);

use Bref\Logger\StderrLogger;
use EdgeTelemetrics\EventCorrelation\Event;
use EdgeTelemetrics\EventCorrelation\Rule;
use EdgeTelemetrics\EventCorrelation\Scheduler;
use Psr\Log\LogLevel;

require __DIR__ . '/../../vendor/autoload.php';

class SendCountToStdOut extends Rule\MatchSingle {
const EVENTS = [['SampleValueEvent']];

public function onComplete(): void
{
$this->emit('data', [New \EdgeTelemetrics\EventCorrelation\Action('echo', ['value' => $this->getFirstEvent()->value])]);
}
}

$rules = [
SendCountToStdOut::class,
];

$scheduler = new \EdgeTelemetrics\EventCorrelation\Scheduler($rules);
$scheduler->setLogger(new StderrLogger(LogLevel::DEBUG));

$scheduler->register_action('echo', function($vars) {
echo 'Next Value: ' . $vars['value'] . PHP_EOL;
});

$numberGenClass = new class() extends Scheduler\SourceFunction {
protected \React\EventLoop\TimerInterface $timer;

function functionStart(): void
{
$this->timer = $this->loop->addPeriodicTimer(1.0, function () {
static $count = 1;
try {
$event = new Event(['event' => 'SampleValueEvent', 'value' => $count++]);
$this->emit('data', [$event]);

if ($count > 10) {
$this->exit();
}
} catch (Throwable $exception) {
$this->emit('error', [$exception]);
$this->exit(255);
return;
}
});
}

function exit(int $code = 0): void
{
$this->running = false;
$this->loop->cancelTimer($this->timer);
unset($this->timer);
$this->emit('exit', [$code]);
}

function functionStop(): void
{
$this->exit();
}
};

$scheduler->register_input_process('generator', $numberGenClass, null, [], false);

$scheduler->run();
138 changes: 106 additions & 32 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

namespace EdgeTelemetrics\EventCorrelation;

use Closure;
use DateTimeImmutable;
use EdgeTelemetrics\EventCorrelation\Management\Server;
use EdgeTelemetrics\EventCorrelation\SaveHandler\FileAdapter;
use EdgeTelemetrics\EventCorrelation\SaveHandler\SaveHandlerInterface;
use EdgeTelemetrics\EventCorrelation\Scheduler\ClosureActionWrapper;
use EdgeTelemetrics\EventCorrelation\Scheduler\FunctionSourceWrapper;
use EdgeTelemetrics\EventCorrelation\Scheduler\SourceFunction;
use EdgeTelemetrics\EventCorrelation\StateMachine\IEventMatcher;
use Exception;
use Psr\Log\LoggerAwareInterface;
Expand All @@ -32,6 +36,7 @@
use EdgeTelemetrics\JSON_RPC\React\Decoder as JsonRpcDecoder;
use RuntimeException;

use Throwable;
use function array_filter;
use function array_key_exists;
use function array_key_first;
Expand Down Expand Up @@ -270,6 +275,7 @@ class Scheduler implements LoggerAwareInterface {
*/
public function __construct(array $rules)
{
$this->loop = Loop::get();
$this->schedulerStartTime = hrtime(true);
$this->rules = $rules;
$this->logger = new NullLogger();
Expand All @@ -279,12 +285,12 @@ public function __construct(array $rules)

/**
* @param string|int $id Descriptive and unique key of the input process
* @param string $cmd Command to execute //@TODO PHP7.4 allows command to be provided as an array
* @param string|array|SourceFunction $cmd Command to execute
* @param string|null $wd Working directory for the process, defaults to current PHP working directory of the Scheduler
* @param array $env Key value pair of Environmental variables to pass to the process
* @param array $env Key-value pair describing Environmental variables to pass to the process
* @param bool $essential If true, Scheduler will shut everything down if this input process exit with no errorCode or doesn't exist
*/
public function register_input_process($id, string $cmd, ?string $wd = null, array $env = [], bool $essential = false) : void
public function register_input_process(string|int $id, string|array|SourceFunction $cmd, ?string $wd = null, array $env = [], bool $essential = false) : void
{
$this->input_processes_config[$id] = ['cmd' => $cmd, 'wd' => $wd, 'env' => $env, 'essential' => $essential];
}
Expand All @@ -294,16 +300,56 @@ public function register_input_process($id, string $cmd, ?string $wd = null, arr
*/
public function initialise_input_processes() : void {
$this->logger->debug('Initialising input processes');
foreach (array_keys($this->input_processes_config) as $id) {
try {
$this->start_input_process($id);
} catch (RuntimeException $ex) {
$this->logger->emergency("An input process failed to start during initialisation.", ['exception' => $ex]);
exit(1);
foreach ($this->input_processes_config as $id => $config) {
if ($config['cmd'] instanceof SourceFunction) {
$this->setup_source_function($id);
} else {
try {
$this->start_input_process($id);
} catch (RuntimeException $ex) {
$this->logger->emergency("An input process failed to start during initialisation.", ['exception' => $ex]);
exit(1);
}
}
}
}

public function setup_source_function(int|string $id): void
{
$config = $this->input_processes_config[$id];
$cmd = clone $config['cmd'];

$this->input_processes[$id] = $cmd;
/** Log any errors received. Wrapper will call exit after error */
$cmd->on('data', function(Event $event) {
$this->engine->handle($event);
});
$cmd->on('error', function(Throwable $error) use ($id) {
$this->logger->error("{id}", ['id'=> $id, 'exception' => $error,]);
});
$cmd->on('exit', function($code) use($id) {
/** Remove from process table */
unset($this->input_processes[$id]);
$level = ($code === 0) ? LogLevel::INFO : LogLevel::ERROR;
$this->logger->log($level,"Input Process {id} exited with code: {code}", ['id' => $id, 'code' => $code,]);

/** Restart the input process if it exits with an error code */
if (false === $this->shuttingDown && $code === 0 && $this->input_processes_config[$id]['essential']) {
$this->logger->info("Essential input $id has stopped cleanly. Shutting down");
$this->shutdown();
return;
}
/** We stop processing if there are no input processes available **/
if (0 === count($this->input_processes)) {
$this->logger->info("No more input processes running. Shutting down");
$this->shutdown();
}
});
$this->loop->futureTick(static function() use ($cmd) {
$cmd->start();
});
}

/**
* @param int|string $id
* @return Process
Expand All @@ -320,8 +366,13 @@ public function start_input_process(int|string $id) : Process {
{
$env = array_merge([static::CHECKPOINT_VARNAME => json_encode($this->input_processes_checkpoints[$id])], $env);
}
//Use exec to ensure process receives our signals and not the bash wrapper
$process = new Process('exec ' . $config['cmd'], $config['wd'], $env);
if (is_string($config['cmd'])) {
/** Use exec to ensure process receives our signals and not the bash wrapper */
$process = new Process('exec ' . $config['cmd'], $config['wd'], $env);
} else {
/** When passed an array, PHP will open the command directly without going through a shell */
$process = new Process($config['cmd'], $config['wd'], $env);
}
$this->setup_input_process($process, $id);
if ($process->isRunning()) {
$this->logger->info("Started input process $id");
Expand All @@ -338,7 +389,8 @@ public function start_input_process(int|string $id) : Process {
* @param Process $process
* @param int|string $id
*/
public function setup_input_process(Process $process, int|string $id) {
public function setup_input_process(Process $process, int|string $id): void
{
try {
$process->start($this->loop);
} catch (RuntimeException $exception) {
Expand All @@ -358,7 +410,7 @@ public function setup_input_process(Process $process, int|string $id) {
$this->logger->critical("Input process did not give us an event array to handle. Received type: {type}, value: {value} ", ['type' => gettype($eventData), 'value' => json_encode($eventData),]);
return;
}
$event = new Event($rpc->getParam('event'));
$event = new Event($eventData);
/**
* Pass the event to the engine to be handled
*/
Expand Down Expand Up @@ -462,12 +514,12 @@ public function setNewEventAction(?string $actionName) : void

/**
* @param string $name
* @param string $cmd
* @param string|array|Closure $cmd
* @param string|null $wd
* @param bool|null $singleShot
* @param array $env
*/
public function register_action(string $name, string $cmd, ?string $wd = null, ?bool $singleShot = false, array $env = []) : void
public function register_action(string $name, string|array|Closure $cmd, ?string $wd = null, ?bool $singleShot = false, array $env = []) : void
{
$this->actionConfig[$name] = ['cmd' => $cmd, 'wd' => $wd, 'env' => $env, 'singleShot' => $singleShot];
}
Expand All @@ -481,8 +533,14 @@ public function start_action(string $actionName): Process
$actionConfig = $this->actionConfig[$actionName];
/** Handle singleShot processes true === $actionConfig['singleShot'] || */
if (!isset($this->runningActions[$actionName])) {
/** If there is no running action then we initialise the process, we call exec to ensure actions can receive our signals and not the default bash wrapper */
$process = new Process('exec ' . $actionConfig['cmd'], $actionConfig['wd'], $actionConfig['env'] ?? []);
/** If there is no running action then we initialise the process **/
if (is_string($actionConfig['cmd'])) {
/** we call exec to ensure actions can receive our signals and not the default bash wrapper */
$process = new Process('exec ' . $actionConfig['cmd'], $actionConfig['wd'], $actionConfig['env'] ?? []);
} else {
/** When passed an array PHP will call command directly without going through a shell */
$process = new Process($actionConfig['cmd'], $actionConfig['wd'], $actionConfig['env'] ?? []);
}
$process->start($this->loop);

$this->logger->info("Started action process $actionName");
Expand Down Expand Up @@ -787,7 +845,6 @@ public function run() : void
$this->logger->info("Garbage collection enabled at runtime");
}

$this->loop = Loop::get();
$this->logger->debug("Using event loop implementation: {class}", ['class' => get_class($this->loop)]);

/** Initialise the management server early in the startup */
Expand Down Expand Up @@ -819,18 +876,33 @@ public function run() : void
$actionName = $action->getCmd();
if (isset($this->actionConfig[$actionName]))
{
$process = $this->start_action($actionName);
/** Once the process is up and running we then write out our data via it's STDIN, encoded as a JSON RPC call */
do {
$uniqid = round(hrtime(true)/1e+3) . '.' . bin2hex(random_bytes(4));
} while (array_key_exists($uniqid, $this->inflightActionCommands));
$rpc_request = new JsonRpcRequest(self::ACTION_RUN_METHOD, $action->getVars(), $uniqid);
$this->inflightActionCommands[$uniqid] = [
'action' => $action,
'pid' => $process->getPid(),
];
$this->dirty = true;
$process->stdin->write(json_encode($rpc_request) . "\n");
$config = $this->actionConfig[$actionName];
if ($config['cmd'] instanceof Closure) {
$cmd = new ClosureActionWrapper($config['cmd'], $this->logger);
try {
$cmd->run($action->getVars());
} catch (Throwable $exception) {
$this->logger->critical('Callable Action ' . $actionName . ' threw.', ['exception' => $exception]);
$error = [
'error' => $exception->getMessage(),
'action' => $action,
];
$this->erroredActionCommands[] = $error;
}
} else {
$process = $this->start_action($actionName);
/** Once the process is up and running we then write out our data via it's STDIN, encoded as a JSON RPC call */
do {
$uniqid = round(hrtime(true)/1e+3) . '.' . bin2hex(random_bytes(4));
} while (array_key_exists($uniqid, $this->inflightActionCommands));
$rpc_request = new JsonRpcRequest(self::ACTION_RUN_METHOD, $action->getVars(), $uniqid);
$this->inflightActionCommands[$uniqid] = [
'action' => $action,
'pid' => $process->getPid(),
];
$this->dirty = true;
$process->stdin->write(json_encode($rpc_request) . "\n");
}
}
else
{
Expand Down Expand Up @@ -997,8 +1069,10 @@ public function exit() : void {
}
$this->loop->stop();
$this->logger->debug("Event Loop stopped");
$this->saveStateHandler->saveStateSync($this->buildState()); //Loop is stopped. Do a blocking synchronous save of current state prior to exit.
unset($this->saveStateHandler);
if (isset($this->saveStateHandler)) {
$this->saveStateHandler->saveStateSync($this->buildState()); //Loop is stopped. Do a blocking synchronous save of current state prior to exit.
unset($this->saveStateHandler);
}
});
}

Expand Down
31 changes: 31 additions & 0 deletions src/Scheduler/ClosureActionWrapper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php declare(strict_types=1);

/*
* This file is part of the PHP Event Correlation package.
*
* (c) James Lucas <james@lucas.net.au>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace EdgeTelemetrics\EventCorrelation\Scheduler;

use Closure;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;

class ClosureActionWrapper implements LoggerAwareInterface {
/** PSR3 logger provides $this->logger */
use LoggerAwareTrait;

public function __construct(private Closure $closure, LoggerInterface $logger) {
$this->setLogger($logger);
}

public function run(array $args): void
{
$this->closure->call($this, $args);
}
}
Loading

0 comments on commit 41b7f5b

Please sign in to comment.