Skip to content

Commit

Permalink
Having non JSON-RPC being written to STDOUT will cause the source or …
Browse files Browse the repository at this point in the history
…action processes to be terminated. Increase error handling for these to catch this before it happens and ensure it is logged correctly

function setupErrorHandling() -> Ensure we don't write errors to STDOUT, Initialise shutdown (errors) and exception handlers for source and action processes
function wrap_source_php_cmd() -> When a source process is wrapped with this function instead of php_cmd, the file source_prepend.php is configured as the auto_prepend_file directive. The source_prepend.php does some boilerplate setup with loading composer autoload, setting up error handling and disabling output buffering
  • Loading branch information
lucasnetau committed Oct 28, 2021
1 parent fa4912d commit d214d81
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 20 deletions.
20 changes: 2 additions & 18 deletions src/Library/Actions/ActionHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@
use React\Stream\ReadableResourceStream;
use React\Stream\WritableResourceStream;

use Throwable;
use function EdgeTelemetrics\EventCorrelation\disableOutputBuffering;
use function EdgeTelemetrics\EventCorrelation\rpcLogMessage;
use function error_get_last;
use function fwrite;
use function json_encode;
use function register_shutdown_function;
use function set_exception_handler;
use function EdgeTelemetrics\EventCorrelation\setupErrorHandling;

/**
* Class Action Helper
Expand All @@ -51,20 +46,9 @@ class ActionHelper extends EventEmitter {
const ACTION_SHUTDOWN = 'shutdown';

public function __construct() {
setupErrorHandling(true);
disableOutputBuffering();
$this->loop = Loop::get();
//Register these after the ReactPHP event loop is initialised via Loop::get() to ensure out shutdown function is always processed after the one registered there
register_shutdown_function(function() {
$last_error = error_get_last();
if (($last_error['type'] ?? 0) & (E_ERROR | E_CORE_ERROR | E_COMPILE_ERROR | E_USER_ERROR | E_RECOVERABLE_ERROR)) {
fwrite(STDOUT, json_encode(rpcLogMessage(LogLevel::EMERGENCY, "Fatal Error ({$last_error['file']}:{$last_error['line']}): {$last_error["message"]}")) . "\n");
}
});
//If any unhandled exception occur then log them to STDOUT (skip the and WritableStreamInterface $output) then terminate the Loop
set_exception_handler(function (Throwable $exception) {
fwrite(STDOUT, json_encode(rpcLogMessage(LogLevel::EMERGENCY, "Action terminating on uncaught exception. " . $exception->getMessage() . "\n" . $exception->getTraceAsString())) . "\n");
Loop::stop();
});

$this->input = new Decoder( new ReadableResourceStream(STDIN));
$this->output = new Encoder( new WritableResourceStream(STDOUT));
Expand Down
7 changes: 7 additions & 0 deletions src/Library/Source/source_prepend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php declare(strict_types=1);

use function EdgeTelemetrics\EventCorrelation\initialiseSourceProcess;

require __DIR__ . '/../../../vendor/autoload.php';

initialiseSourceProcess(true);
55 changes: 54 additions & 1 deletion src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
namespace EdgeTelemetrics\EventCorrelation;

use EdgeTelemetrics\JSON_RPC\Notification as JsonRpcNotification;
use Psr\Log\LogLevel;
use React\EventLoop\Loop;
use RuntimeException;

use Throwable;
use function error_get_last;
use function escapeshellarg;
use function function_exists;
use function fwrite;
use function json_encode;
use function register_shutdown_function;
use function set_exception_handler;
use const PHP_BINARY;

if (! function_exists('EdgeTelemetrics\EventCorrelation\disableOutputBuffering')) {

function disableOutputBuffering()
{
/** Disable all output buffering */
ini_set('zlib.output_compression', '0');
@ini_set('zlib.output_compression', '0');
ini_set('output_buffering', '0');
ini_set('implicit_flush', '1');
if (PHP_VERSION_ID < 80000) {
Expand Down Expand Up @@ -84,6 +93,14 @@ function php_cmd(string $filename): string
}
}

if (! function_exists('EdgeTelemetrics\EventCorrelation\wrap_source_php_cmd')) {
function wrap_source_php_cmd(string $filename): string
{
$prepend_file = __DIR__ . '/Library/Source/source_prepend.php';
return escapeshellarg(PHP_BINARY) . " -d auto_prepend_file=" . escapeshellarg($prepend_file) . " -f " . escapeshellarg($filename) . " --";
}
}

/**
* @param string $unknownClassName
*/
Expand All @@ -107,3 +124,39 @@ function rpcLogMessage(string $level, string $message): JsonRpcNotification {
return new JsonRpcNotification(Scheduler::RPC_PROCESS_LOG, ['logLevel' => $level, 'message' => $message ]);
}
}

if (! function_exists('EdgeTelemetrics\EventCorrelation\setupErrorHandling')) {
function setupErrorHandling(bool $usingEventLoop)
{
//Errors should be written to STDERR and not STDOUT. Disable log errors to prevent duplicate messages to STDERR
ini_set('display_errors', 'stderr');
ini_set('log_errors', 'no');
if ($usingEventLoop) {
Loop::get();
}
//Register these after the ReactPHP event loop is initialised via Loop::get() to ensure out shutdown function is always processed after the one registered there
register_shutdown_function(function () {
$last_error = error_get_last();
if (($last_error['type'] ?? 0) & (E_ERROR | E_CORE_ERROR | E_COMPILE_ERROR | E_USER_ERROR | E_RECOVERABLE_ERROR)) {
fwrite(STDOUT, json_encode(rpcLogMessage(LogLevel::EMERGENCY,
"Fatal Error ({$last_error['file']}:{$last_error['line']}): {$last_error["message"]}")) . "\n");
}
});
//If any unhandled exception occur then log them to STDOUT (skip the and WritableStreamInterface $output) then terminate the Loop
set_exception_handler(function (Throwable $exception) use ($usingEventLoop) {
fwrite(STDOUT, json_encode(rpcLogMessage(LogLevel::EMERGENCY,
"Process terminating on uncaught exception. " . $exception->getMessage() . "\n" . $exception->getTraceAsString())) . "\n");
if ($usingEventLoop) {
Loop::stop();
}
});
}
}

if (! function_exists('EdgeTelemetrics\EventCorrelation\initialiseSourceProcess')) {
function initialiseSourceProcess(bool $usingEventLoop)
{
disableOutputBuffering();
setupErrorHandling($usingEventLoop);
}
}
2 changes: 1 addition & 1 deletion tests/Actions/log.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use function EdgeTelemetrics\EventCorrelation\disableOutputBuffering;

require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/../../vendor/autoload.php';

disableOutputBuffering();

Expand Down
11 changes: 11 additions & 0 deletions tests/Sources/source_with_error.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php declare(strict_types=1);

use EdgeTelemetrics\EventCorrelation\Event;
use EdgeTelemetrics\EventCorrelation\Scheduler;
use EdgeTelemetrics\JSON_RPC\Notification as JsonRpcNotification;

$event = new Event(['event' => 'Test:Event:1','datetime' => (new DateTimeImmutable())->format('c')]);
$rpc = new JsonRpcNotification(Scheduler::INPUT_ACTION_HANDLE, ['event' => $event]);
fwrite(STDOUT, json_encode($rpc) . "\n");

trigger_error('Test error handling of a triggered error');
11 changes: 11 additions & 0 deletions tests/Sources/source_with_exception.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php declare(strict_types=1);

use EdgeTelemetrics\EventCorrelation\Event;
use EdgeTelemetrics\EventCorrelation\Scheduler;
use EdgeTelemetrics\JSON_RPC\Notification as JsonRpcNotification;

$event = new Event(['event' => 'Test:Event:1','datetime' => (new DateTimeImmutable())->format('c')]);
$rpc = new JsonRpcNotification(Scheduler::INPUT_ACTION_HANDLE, ['event' => $event]);
fwrite(STDOUT, json_encode($rpc) . "\n");

throw new Exception('Test error handling of an un-caught exception');
47 changes: 47 additions & 0 deletions tests/test_source_fatal_exception.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php declare(strict_types=1);

/**
* Test: Unhandled exception thrown in action
*/
use Bref\Logger\StderrLogger;
use EdgeTelemetrics\EventCorrelation\Scheduler;
use EdgeTelemetrics\EventCorrelation\tests\Rules\LogEverything;
use Psr\Log\LogLevel;

use function EdgeTelemetrics\EventCorrelation\php_cmd;
use function EdgeTelemetrics\EventCorrelation\wrap_source_php_cmd;

require __DIR__ . "/../vendor/autoload.php";

$rules = [
LogEverything::class,
];

define('STATE_FILE', tempnam(sys_get_temp_dir(), 'php-ec-test.'));

(new class($rules) extends Scheduler {
public function __construct(array $rules)
{
parent::__construct($rules);
set_exception_handler([$this, "handle_exception"]);
$this->setLogger(new StderrLogger(LogLevel::DEBUG));

$this->register_input_process('source_with_exception', wrap_source_php_cmd(__DIR__ . "/Sources/source_with_exception.php"));
$this->register_input_process('source_with_exception', wrap_source_php_cmd(__DIR__ . "/Sources/source_with_error.php"));

$this->register_action('log', php_cmd(__DIR__ . "/actions/log.php"), null, false, ['LOG_FILENAME' => '/tmp/php-ec-test.txt']);

$this->setSavefileName(STATE_FILE);
$this->setSaveStateInterval(1);
}

public function handle_exception(Throwable $exception) {
$this->logger->emergency("Fatal", ['exception' => $exception,]);
}
})->run();

$state = json_decode(file_get_contents(STATE_FILE), true);
echo json_encode($state, JSON_PRETTY_PRINT);

unlink(STATE_FILE);
unlink('/tmp/php-ec-test.txt');

0 comments on commit d214d81

Please sign in to comment.