Skip to content

Commit

Permalink
Small cleanup in code and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasnetau committed Dec 7, 2023
1 parent cd31bfe commit f07ef7c
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions src/CorrelationEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,13 @@ public function handle(Event $event) : void
/** Record that we have seen an event of this type */
$this->incrStat('seen', (string)$event->event);

/** If the event stream is live we want to make sure the event timestamp is within
* 10 minutes of the current time, otherwise we will set it to the server time.
/**
* If the event stream is live we want to make sure the event timestamp is within
* MAX_TIME_VARIANCE seconds of the current time, otherwise we will set it to the server time.
*/
if (true === $this->eventstream_live)
{
if (true === $this->eventstream_live) {
$now = new DateTimeImmutable();
if (abs($now->getTimestamp() - $event->datetime->getTimestamp()) > (self::MAX_TIME_VARIANCE))
{
if (abs($now->getTimestamp() - $event->datetime->getTimestamp()) > (self::MAX_TIME_VARIANCE)) {
echo "Correcting received time to {$now->format('c')}\n";
$event->setReceivedTime($now);
}
Expand All @@ -158,15 +157,15 @@ public function handle(Event $event) : void
* @TODO This might not be the best place to call this where multiple stream interlace with different timestamps.
* @TODO This should be moved to checking if a matcher would handle the event then to call it's timeout check first and other items.
*/
if (false === $this->eventstream_live)
{
if (false === $this->eventstream_live) {
$this->checkTimeouts($event->datetime->modify('-1 second'));
}

/**
* Check existing state machines first to see if the event can be handled
*/
$awaitingMatchers = array_merge(($this->waitingForNextEvent[$event->event] ?? []), ($this->waitingForNextEvent[IEventMatcher::EVENT_MATCH_ANY] ?? []));
$awaitingMatchers = array_merge(($this->waitingForNextEvent[$event->event] ?? []),
($this->waitingForNextEvent[IEventMatcher::EVENT_MATCH_ANY] ?? []));

foreach ($awaitingMatchers as $matcher) {
/* @var $matcher IEventMatcher */
Expand Down Expand Up @@ -204,15 +203,14 @@ public function handle(Event $event) : void
* A new state machine will not be created if an existing state machine suppressed the event
* or if a state machine of the same class handled the event
*/
if (false === $suppress)
{
$awaitingMatchers = array_merge(($this->initialEventLookup[$event->event] ?? []), ($this->initialEventLookup[IEventMatcher::EVENT_MATCH_ANY] ?? []));
if (false === $suppress) {
$awaitingMatchers = array_diff(
array_merge(($this->initialEventLookup[$event->event] ?? []),
($this->initialEventLookup[IEventMatcher::EVENT_MATCH_ANY] ?? [])),
$skipMatchers /** If this className has already handled this event, don't create another **/
);

foreach ($awaitingMatchers as $class) {
/** If this className has already handled this event, don't create another **/
if (in_array($class, $skipMatchers, true)) {
continue;
}
$matcher = $this->constructMatcher($class);
$result = $matcher->handle($event);
if ($this->isFlagSet($result, $matcher::EVENT_HANDLED)) {
Expand All @@ -239,20 +237,17 @@ public function handle(Event $event) : void
}
}

if (count($handledMatchers) == 0)
{
if (count($handledMatchers) == 0) {
$this->incrStat('unhandled', (string)$event->event);
}

/** For any matchers that processed this event fire any actions, then update timeout or destroy if complete **/
$stateChanged = (count($handledMatchers) + count($timedOutMatchers)) > 0;
foreach($handledMatchers as $matcher)
{
foreach ($handledMatchers as $matcher) {
$matcher->fire();
$this->addTimeout($matcher);

if ($matcher->complete())
{
if ($matcher->complete()) {
/** Record stat of matcher completing */
$this->incrStat('completed_matcher', get_class($matcher));
$this->removeMatcher($matcher);
Expand All @@ -261,8 +256,7 @@ public function handle(Event $event) : void
}

/** Fire any action and destroy any timed out matchers **/
foreach($timedOutMatchers as $matcher)
{
foreach ($timedOutMatchers as $matcher) {
$this->removeTimeout($matcher);
$matcher->fire();
/** Record stat of matcher timeout */
Expand Down

0 comments on commit f07ef7c

Please sign in to comment.