Skip to content

Commit

Permalink
Allow Rules to trigger the running of source process/function in resp…
Browse files Browse the repository at this point in the history
…onse to an event (for example running a source process that loads historical data as an event for the Rule)
  • Loading branch information
lucasnetau committed Jan 25, 2024
1 parent d7982d5 commit 677d0f8
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/CorrelationEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use DateInterval;
use EdgeTelemetrics\EventCorrelation\Rule\UndefinedRule;
use EdgeTelemetrics\EventCorrelation\Scheduler\Messages\ExecuteSource;
use EdgeTelemetrics\EventCorrelation\StateMachine\AEventProcessor;
use EdgeTelemetrics\EventCorrelation\StateMachine\IEventMatcher;
use EdgeTelemetrics\EventCorrelation\StateMachine\IEventGenerator;
Expand Down Expand Up @@ -94,6 +95,7 @@ class CorrelationEngine implements EventEmitterInterface {
protected array $emitMapping = [
'event' => Event::class,
'action' => Action::class,
'source' => ExecuteSource::class,
];

/**
Expand Down
45 changes: 38 additions & 7 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,20 @@ public function __construct(array $rules)
* @param string|null $wd Working directory for the process, defaults to current PHP working directory of the Scheduler
* @param array $env Key-value pair describing Environmental variables to pass to the process
* @param bool $essential If true, Scheduler will shut everything down if this input process exit with no errorCode or doesn't exist
* @param bool $autostart Whether to start the process automatically on startup or when requested
*/
public function register_input_process(string|int $id, string|array|SourceFunction $cmd, ?string $wd = null, array $env = [], bool $essential = false) : void
public function register_input_process(string|int $id, string|array|SourceFunction $cmd, ?string $wd = null, array $env = [], bool $essential = false, bool $autostart = true) : void
{
$this->input_processes_config[$id] = ['cmd' => $cmd, 'wd' => $wd, 'env' => $env, 'essential' => $essential];
$this->input_processes_config[$id] = ['cmd' => $cmd, 'wd' => $wd, 'env' => $env, 'essential' => $essential, 'autostart' => $autostart];
}

/**
* @param string|int $id
* @return void
*/
public function unregister_input_process(string|int $id) : void
{
unset($this->input_processes_config[$id]);
}

/**
Expand All @@ -297,11 +307,9 @@ public function register_input_process(string|int $id, string|array|SourceFuncti
public function initialise_input_processes() : void {
$this->logger->debug('Initialising input processes');
foreach ($this->input_processes_config as $id => $config) {
if ($config['cmd'] instanceof SourceFunction) {
$this->setup_source_function($id);
} else {
if ($config['autostart'] === true) {
try {
$this->start_input_process($id);
$this->initialise_input_process($id);
} catch (RuntimeException $ex) {
$this->logger->emergency("An input process failed to start during initialisation.", ['exception' => $ex]);
exit(1);
Expand All @@ -311,7 +319,16 @@ public function initialise_input_processes() : void {
$this->state = new State(State::RUNNING);
}

public function setup_source_function(int|string $id): void
public function initialise_input_process(int|string $id) : Process|SourceFunction {
$config = $this->input_processes_config[$id];
if ($config['cmd'] instanceof SourceFunction) {
return $this->setup_source_function($id);
} else {
return $this->start_input_process($id);
}
}

public function setup_source_function(int|string $id): SourceFunction
{
$config = $this->input_processes_config[$id];
$cmd = clone $config['cmd'];
Expand Down Expand Up @@ -346,6 +363,7 @@ public function setup_source_function(int|string $id): void
$this->loop->futureTick(static function() use ($cmd, $env) {
$cmd->start($env);
});
return $cmd;
}

/**
Expand Down Expand Up @@ -920,6 +938,19 @@ public function run() : void
}
});

/** Handle request to run an on demand source */
$this->engine->on('source', function(Scheduler\Messages\ExecuteSource $execute) {
$config = $this->input_processes_config[$execute->getCmd()];
$rndid = $execute->getCmd() . '_' . bin2hex(random_bytes(4)); //Generate unique ID for the on demand run
$env = ($config['env'] ?? []) + $execute->getVars();
//Register the on demand source
$this->register_input_process($rndid, $config['cmd'], $config['wd'], $env, false, false);
$process = $this->initialise_input_process($rndid);
$process->on('exit', function() use ($rndid) {
$this->unregister_input_process($rndid); //Remove config once process exits
});
});

/** If we have any errored actions then we replay them and attempt recovery. In normal state we initialise the input processes */
if (count($this->erroredActionCommands)) {
$this->logger->notice('Beginning failed action recovery process');
Expand Down
74 changes: 74 additions & 0 deletions src/Scheduler/Messages/ExecuteSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php declare(strict_types=1);

/*
* This file is part of the PHP Event Correlation package.
*
* (c) James Lucas <james@lucas.net.au>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace EdgeTelemetrics\EventCorrelation\Scheduler\Messages;

use EdgeTelemetrics\EventCorrelation\Event;
use JsonSerializable;

/**
* Class Action
* @package EdgeTelemetrics\EventCorrelation
*/
class ExecuteSource implements JsonSerializable {

/**
* @var string
*/
protected string $cmd;

/**
* @var array
*/
protected array $vars;

/**
* Action constructor.
* @param string $cmd
* @param array|Event $vars
*/
public function __construct(string $cmd, Event|array $vars)
{
$this->cmd = $cmd;
if ($vars instanceof Event) {
$this->vars = $vars->toArray();
} else {
$this->vars = $vars;
}
}

/**
* @return array
*/
public function jsonSerialize() : array
{
return [
'cmd' => $this->cmd,
'vars' => $this->vars
];
}

/**
* @return string
*/
public function getCmd() : string
{
return $this->cmd;
}

/**
* @return array
*/
public function getVars() : array
{
return $this->vars;
}
}

0 comments on commit 677d0f8

Please sign in to comment.