Skip to content

Commit

Permalink
Close stdin to action processes before sending the SIGTERM signal to …
Browse files Browse the repository at this point in the history
…flush any remaining buffered action calls

Extra debugging logs around shutdown handling
  • Loading branch information
lucasnetau committed May 11, 2023
1 parent 8c8ddd7 commit 3e8450b
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use RuntimeException;

use function array_filter;
use function array_key_exists;
use function array_key_first;
use function array_keys;
use function array_map;
Expand Down Expand Up @@ -325,6 +326,9 @@ public function start_input_process(int|string $id) : Process {
if ($process->isRunning()) {
$this->logger->info("Started input process $id");
$this->input_processes[$id] = $process;
if (function_exists('\posix_setpgid')) {
\posix_setpgid($process->getPid(), 0);
}
return $process;
}
throw new RuntimeException('Input process ' . $id . ' failed to start');
Expand Down Expand Up @@ -912,11 +916,13 @@ public function shutdown() : void {

while ($task = array_pop($this->scheduledTasks)) {
$this->loop->cancelTimer($task);
$task = null;
}

if (count($this->input_processes) > 0) {
$this->logger->debug( "Shutting down running input processes");
foreach ($this->input_processes as $processKey => $process) {
$this->logger->debug( "Sending SIGTERM to input process $processKey");
if (false === $process->terminate(SIGTERM)) {
$this->logger->error( "Unable to send SIGTERM to input process $processKey");
}
Expand All @@ -933,7 +939,6 @@ public function shutdown() : void {
} else {
$this->stop();
}

}

/**
Expand All @@ -948,13 +953,20 @@ public function stop() : void
/**
* Notify any rules listening for a Stop event that we are stopping
*/
$this->logger->debug( "Notify CorrelationEngine that we are stopping");
$this->engine->handle(new Event(['event' => static::CONTROL_MSG_STOP]));

/** Check if we have any running action commands, if we do then some actions may not have completed and/or need to flush+complete tasks. Send them a SIGTERM to complete their shutdown */
if (count($this->runningActions) > 0) {
foreach ($this->runningActions as $processKey => $process) {
/** End the stdin for the process to ensure we flush any pending actions */
$process->stdin->end();
}

$this->loop->futureTick(function() {
$this->logger->debug("Shutting down running action processes");
foreach ($this->runningActions as $processKey => $process) {
$this->logger->debug( "Sending SIGTERM to action process $processKey");
if (false === $process->terminate(SIGTERM)) {
$this->logger->error("Unable to send SIGTERM to action process $processKey");
}
Expand Down Expand Up @@ -986,6 +998,7 @@ public function exit() : void {
$this->loop->stop();
$this->logger->debug("Event Loop stopped");
$this->saveStateHandler->saveStateSync($this->buildState()); //Loop is stopped. Do a blocking synchronous save of current state prior to exit.
unset($this->saveStateHandler);
});
}

Expand Down Expand Up @@ -1120,7 +1133,7 @@ protected function checkMemoryPressure() : void
/**
* Run memory reclaim
*/
protected function memoryReclaim() {
protected function memoryReclaim() : void {
$mark = hrtime(true);
$memCurr = memory_get_usage();
$cycles = gc_collect_cycles();
Expand Down

0 comments on commit 3e8450b

Please sign in to comment.