Skip to content

Commit

Permalink
[Alerting] Adding rule executor service functions to support executio…
Browse files Browse the repository at this point in the history
…n cancellation (elastic#120506)

* Adding abortable es client factory

* Starting to add tests

* Tests

* Fixing types

* Fixing types

* Fixing types

* Adding functional tests

* Reverting changes to rule type

* Handling the correct error message

* Cleanup

* Fixing preview rules route

* Updating README

* Removing unneeded observable

* Cleanup

* Fixing types

* Using abort signal

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
2 people authored and gbamparop committed Jan 12, 2022
1 parent d103f78 commit cb6955a
Show file tree
Hide file tree
Showing 17 changed files with 553 additions and 8 deletions.
14 changes: 14 additions & 0 deletions x-pack/plugins/alerting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ This is the primary function for a rule type. Whenever the rule needs to execute
|services.savedObjectsClient|This is an instance of the saved objects client. This provides the ability to perform CRUD operations on any saved object that lives in the same space as the rule.<br><br>The scope of the saved objects client is tied to the user who created the rule (only when security is enabled).|
|services.alertInstanceFactory(id)|This [alert factory](#alert-factory) creates alerts and must be used in order to execute actions. The id you give to the alert factory is a unique identifier for the alert.|
|services.log(tags, [data], [timestamp])|Use this to create server logs. (This is the same function as server.log)|
|services.shouldWriteAlerts()|This returns a boolean indicating whether the executor should write out alerts as data. This is determined by whether rule execution has been cancelled due to timeout AND whether both the Kibana `cancelAlertsOnRuleTimeout` flag and the rule type `cancelAlertsOnRuleTimeout` are set to `true`.|
|services.shouldStopExecution()|This returns a boolean indicating whether rule execution has been cancelled due to timeout.|
|services.search|This provides an implementation of Elasticsearch client `search` function that aborts searches if rule execution is cancelled mid-search.|
|startedAt|The date and time the rule type started execution.|
|previousStartedAt|The previous date and time the rule type started a successful execution.|
|params|Parameters for the execution. This is where the parameters you require will be passed in. (e.g. threshold). Use rule type validation to ensure values are set before execution.|
Expand Down Expand Up @@ -287,9 +290,20 @@ const myRuleType: RuleType<
// Let's assume params is { server: 'server_1', threshold: 0.8 }
const { server, threshold } = params;

// Query Elasticsearch using a cancellable search
// If rule execution is cancelled mid-search, the search request will be aborted
// and an error will be thrown.
const esClient = services.search.asCurrentUser;
await esClient.search(esQuery);

// Call a function to get the server's current CPU usage
const currentCpuUsage = await getCpuUsage(server);

// Periodically check that execution should continue
if (services.shouldStopExecution()) {
throw new Error('short circuiting rule execution!');
}

// Only execute if CPU usage is greater than threshold
if (currentCpuUsage > threshold) {
// The first argument is a unique identifier for the alert. In this
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/alerting/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ export type { FindResult } from './rules_client';
export type { PublicAlertInstance as AlertInstance } from './alert_instance';
export { parseDuration } from './lib';
export { getEsErrorMessage } from './lib/errors';
export type {
IAbortableEsClient,
IAbortableClusterClient,
} from './lib/create_abortable_es_client_factory';
export {
ReadOperations,
AlertingAuthorizationFilterType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { elasticsearchServiceMock } from '../../../../../src/core/server/mocks';
import { createAbortableEsClientFactory } from './create_abortable_es_client_factory';

const esQuery = {
body: { query: { bool: { filter: { range: { '@timestamp': { gte: 0 } } } } } },
};

describe('createAbortableEsClientFactory', () => {
beforeAll(() => {
jest.useFakeTimers();
});

afterAll(() => {
jest.useRealTimers();
});

test('searches with asInternalUser when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const abortableSearchClient = createAbortableEsClientFactory({
scopedClusterClient,
abortController,
});

await abortableSearchClient.asInternalUser.search(esQuery);
expect(scopedClusterClient.asInternalUser.search).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});

test('searches with asCurrentUser when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const abortableSearchClient = createAbortableEsClientFactory({
scopedClusterClient,
abortController,
});

await abortableSearchClient.asCurrentUser.search(esQuery);
expect(scopedClusterClient.asCurrentUser.search).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
});

test('uses search options when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const abortableSearchClient = createAbortableEsClientFactory({
scopedClusterClient,
abortController,
});

await abortableSearchClient.asInternalUser.search(esQuery, { ignore: [404] });
expect(scopedClusterClient.asInternalUser.search).toHaveBeenCalledWith(esQuery, {
ignore: [404],
signal: abortController.signal,
});
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});

test('re-throws error when search throws error', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
scopedClusterClient.asInternalUser.search.mockRejectedValueOnce(
new Error('something went wrong!')
);
const abortableSearchClient = createAbortableEsClientFactory({
scopedClusterClient,
abortController,
});

await expect(
abortableSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});

test('throws error when search throws abort error', async () => {
const abortController = new AbortController();
abortController.abort();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
scopedClusterClient.asInternalUser.search.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);
const abortableSearchClient = createAbortableEsClientFactory({
scopedClusterClient,
abortController,
});

await expect(
abortableSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Search has been aborted due to cancelled execution"`
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { TransportRequestOptions, TransportResult } from '@elastic/elasticsearch';
import { SearchResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { IScopedClusterClient } from 'src/core/server';
import type { ESSearchRequest } from 'src/core/types/elasticsearch';

export interface IAbortableEsClient {
search: (
query: ESSearchRequest,
options?: TransportRequestOptions
) => Promise<TransportResult<SearchResponse<unknown>, unknown>>;
}

export interface IAbortableClusterClient {
readonly asInternalUser: IAbortableEsClient;
readonly asCurrentUser: IAbortableEsClient;
}
export interface CreateAbortableEsClientFactoryOpts {
scopedClusterClient: IScopedClusterClient;
abortController: AbortController;
}

export function createAbortableEsClientFactory(opts: CreateAbortableEsClientFactoryOpts) {
const { scopedClusterClient, abortController } = opts;
return {
asInternalUser: {
search: async (query: ESSearchRequest, options?: TransportRequestOptions) => {
try {
const searchOptions = options ?? {};
return await scopedClusterClient.asInternalUser.search(query, {
...searchOptions,
signal: abortController.signal,
});
} catch (e) {
if (abortController.signal.aborted) {
throw new Error('Search has been aborted due to cancelled execution');
}
throw e;
}
},
},
asCurrentUser: {
search: async (query: ESSearchRequest, options?: TransportRequestOptions) => {
try {
const searchOptions = options ?? {};
return await scopedClusterClient.asCurrentUser.search(query, {
...searchOptions,
signal: abortController.signal,
});
} catch (e) {
if (abortController.signal.aborted) {
throw new Error('Search has been aborted due to cancelled execution');
}
throw e;
}
},
},
};
}
17 changes: 17 additions & 0 deletions x-pack/plugins/alerting/server/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ const createAlertInstanceFactoryMock = <
return mock as unknown as AlertInstanceMock<InstanceState, InstanceContext>;
};

const createAbortableSearchClientMock = () => {
const mock = {
search: jest.fn(),
};

return mock;
};

const createAbortableSearchServiceMock = () => {
return {
asInternalUser: createAbortableSearchClientMock(),
asCurrentUser: createAbortableSearchClientMock(),
};
};

const createAlertServicesMock = <
InstanceState extends AlertInstanceState = AlertInstanceState,
InstanceContext extends AlertInstanceContext = AlertInstanceContext
Expand All @@ -75,6 +90,8 @@ const createAlertServicesMock = <
savedObjectsClient: savedObjectsClientMock.create(),
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
shouldWriteAlerts: () => true,
shouldStopExecution: () => true,
search: createAbortableSearchServiceMock(),
};
};
export type AlertServicesMock = ReturnType<typeof createAlertServicesMock>;
Expand Down
13 changes: 13 additions & 0 deletions x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
createAlertEventLogRecordObject,
Event,
} from '../lib/create_alert_event_log_record_object';
import { createAbortableEsClientFactory } from '../lib/create_abortable_es_client_factory';

const FALLBACK_RETRY_INTERVAL = '5m';

Expand Down Expand Up @@ -93,6 +94,7 @@ export class TaskRunner<
RecoveryActionGroupId
>;
private readonly ruleTypeRegistry: RuleTypeRegistry;
private searchAbortController: AbortController;
private cancelled: boolean;

constructor(
Expand All @@ -114,6 +116,7 @@ export class TaskRunner<
this.ruleName = null;
this.taskInstance = taskInstanceToAlertTaskInstance(taskInstance);
this.ruleTypeRegistry = context.ruleTypeRegistry;
this.searchAbortController = new AbortController();
this.cancelled = false;
}

Expand Down Expand Up @@ -321,6 +324,11 @@ export class TaskRunner<
WithoutReservedActionGroups<ActionGroupIds, RecoveryActionGroupId>
>(alerts),
shouldWriteAlerts: () => this.shouldLogAndScheduleActionsForAlerts(),
shouldStopExecution: () => this.cancelled,
search: createAbortableEsClientFactory({
scopedClusterClient: services.scopedClusterClient,
abortController: this.searchAbortController,
}),
},
params,
state: alertTypeState as State,
Expand Down Expand Up @@ -726,6 +734,11 @@ export class TaskRunner<
`Cancelling rule type ${this.ruleType.id} with id ${ruleId} - execution exceeded rule type timeout of ${this.ruleType.ruleTaskTimeout}`
);

this.logger.debug(
`Aborting any in-progress ES searches for rule type ${this.ruleType.id} with id ${ruleId}`
);
this.searchAbortController.abort();

const eventLogger = this.context.eventLogger;
const event: IEvent = {
event: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ describe('Task Runner Cancel', () => {
await taskRunner.cancel();
await promise;

const logger = taskRunnerFactoryInitializerParams.logger;
expect(logger.debug).toHaveBeenNthCalledWith(
3,
`Aborting any in-progress ES searches for rule type test with id 1`
);

const eventLogger = taskRunnerFactoryInitializerParams.eventLogger;
// execute-start event, timeout event and then an execute event because rule executors are not cancelling anything yet
expect(eventLogger.logEvent).toHaveBeenCalledTimes(3);
Expand Down Expand Up @@ -394,26 +400,30 @@ describe('Task Runner Cancel', () => {
await promise;

const logger = taskRunnerFactoryInitializerParams.logger;
expect(logger.debug).toHaveBeenCalledTimes(6);
expect(logger.debug).toHaveBeenCalledTimes(7);
expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z');
expect(logger.debug).nthCalledWith(
2,
`Cancelling rule type test with id 1 - execution exceeded rule type timeout of 5m`
);
expect(logger.debug).nthCalledWith(
3,
`Updating rule task for test rule with id 1 - execution error due to timeout`
'Aborting any in-progress ES searches for rule type test with id 1'
);
expect(logger.debug).nthCalledWith(
4,
`rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]`
`Updating rule task for test rule with id 1 - execution error due to timeout`
);
expect(logger.debug).nthCalledWith(
5,
`no scheduling of actions for rule test:1: 'rule-name': rule execution has been cancelled.`
`rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]`
);
expect(logger.debug).nthCalledWith(
6,
`no scheduling of actions for rule test:1: 'rule-name': rule execution has been cancelled.`
);
expect(logger.debug).nthCalledWith(
7,
'ruleExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}'
);

Expand Down Expand Up @@ -511,22 +521,26 @@ describe('Task Runner Cancel', () => {

function testActionsExecute() {
const logger = taskRunnerFactoryInitializerParams.logger;
expect(logger.debug).toHaveBeenCalledTimes(5);
expect(logger.debug).toHaveBeenCalledTimes(6);
expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z');
expect(logger.debug).nthCalledWith(
2,
`Cancelling rule type test with id 1 - execution exceeded rule type timeout of 5m`
);
expect(logger.debug).nthCalledWith(
3,
`Updating rule task for test rule with id 1 - execution error due to timeout`
'Aborting any in-progress ES searches for rule type test with id 1'
);
expect(logger.debug).nthCalledWith(
4,
`rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]`
`Updating rule task for test rule with id 1 - execution error due to timeout`
);
expect(logger.debug).nthCalledWith(
5,
`rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]`
);
expect(logger.debug).nthCalledWith(
6,
'ruleExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}'
);

Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/alerting/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
SanitizedRuleConfig,
} from '../common';
import { LicenseType } from '../../licensing/server';
import { IAbortableClusterClient } from './lib/create_abortable_es_client_factory';

export type WithoutQueryAndParams<T> = Pick<T, Exclude<keyof T, 'query' | 'params'>>;
export type GetServicesFunction = (request: KibanaRequest) => Services;
Expand Down Expand Up @@ -76,6 +77,8 @@ export interface AlertServices<
id: string
) => PublicAlertInstance<InstanceState, InstanceContext, ActionGroupIds>;
shouldWriteAlerts: () => boolean;
shouldStopExecution: () => boolean;
search: IAbortableClusterClient;
}

export interface AlertExecutorOptions<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ function createRule(shouldWriteAlerts: boolean = true) {
savedObjectsClient: {} as any,
scopedClusterClient: {} as any,
shouldWriteAlerts: () => shouldWriteAlerts,
shouldStopExecution: () => false,
search: {} as any,
},
spaceId: 'spaceId',
state,
Expand Down
Loading

0 comments on commit cb6955a

Please sign in to comment.