diff --git a/x-pack/plugins/alerting/README.md b/x-pack/plugins/alerting/README.md index fcb31977806c9d2..e01228d5cddc6bd 100644 --- a/x-pack/plugins/alerting/README.md +++ b/x-pack/plugins/alerting/README.md @@ -115,6 +115,9 @@ This is the primary function for a rule type. Whenever the rule needs to execute |services.savedObjectsClient|This is an instance of the saved objects client. This provides the ability to perform CRUD operations on any saved object that lives in the same space as the rule.

The scope of the saved objects client is tied to the user who created the rule (only when security is enabled).| |services.alertInstanceFactory(id)|This [alert factory](#alert-factory) creates alerts and must be used in order to execute actions. The id you give to the alert factory is a unique identifier for the alert.| |services.log(tags, [data], [timestamp])|Use this to create server logs. (This is the same function as server.log)| +|services.shouldWriteAlerts()|This returns a boolean indicating whether the executor should write out alerts as data. This is determined by whether rule execution has been cancelled due to timeout AND whether both the Kibana `cancelAlertsOnRuleTimeout` flag and the rule type `cancelAlertsOnRuleTimeout` are set to `true`.| +|services.shouldStopExecution()|This returns a boolean indicating whether rule execution has been cancelled due to timeout.| +|services.search|This provides an implementation of Elasticsearch client `search` function that aborts searches if rule execution is cancelled mid-search.| |startedAt|The date and time the rule type started execution.| |previousStartedAt|The previous date and time the rule type started a successful execution.| |params|Parameters for the execution. This is where the parameters you require will be passed in. (e.g. threshold). Use rule type validation to ensure values are set before execution.| @@ -287,9 +290,20 @@ const myRuleType: RuleType< // Let's assume params is { server: 'server_1', threshold: 0.8 } const { server, threshold } = params; + // Query Elasticsearch using a cancellable search + // If rule execution is cancelled mid-search, the search request will be aborted + // and an error will be thrown. + const esClient = services.search.asCurrentUser; + await esClient.search(esQuery); + // Call a function to get the server's current CPU usage const currentCpuUsage = await getCpuUsage(server); + // Periodically check that execution should continue + if (services.shouldStopExecution()) { + throw new Error('short circuiting rule execution!'); + } + // Only execute if CPU usage is greater than threshold if (currentCpuUsage > threshold) { // The first argument is a unique identifier for the alert. In this diff --git a/x-pack/plugins/alerting/server/index.ts b/x-pack/plugins/alerting/server/index.ts index 90bda8b1e09d478..5d4bd0b52eddb59 100644 --- a/x-pack/plugins/alerting/server/index.ts +++ b/x-pack/plugins/alerting/server/index.ts @@ -35,6 +35,10 @@ export type { FindResult } from './rules_client'; export type { PublicAlertInstance as AlertInstance } from './alert_instance'; export { parseDuration } from './lib'; export { getEsErrorMessage } from './lib/errors'; +export type { + IAbortableEsClient, + IAbortableClusterClient, +} from './lib/create_abortable_es_client_factory'; export { ReadOperations, AlertingAuthorizationFilterType, diff --git a/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.test.ts b/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.test.ts new file mode 100644 index 000000000000000..ed334e262a3a2b3 --- /dev/null +++ b/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.test.ts @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { elasticsearchServiceMock } from '../../../../../src/core/server/mocks'; +import { createAbortableEsClientFactory } from './create_abortable_es_client_factory'; + +const esQuery = { + body: { query: { bool: { filter: { range: { '@timestamp': { gte: 0 } } } } } }, +}; + +describe('createAbortableEsClientFactory', () => { + beforeAll(() => { + jest.useFakeTimers(); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + test('searches with asInternalUser when specified', async () => { + const abortController = new AbortController(); + const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + const abortableSearchClient = createAbortableEsClientFactory({ + scopedClusterClient, + abortController, + }); + + await abortableSearchClient.asInternalUser.search(esQuery); + expect(scopedClusterClient.asInternalUser.search).toHaveBeenCalledWith(esQuery, { + signal: abortController.signal, + }); + expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled(); + }); + + test('searches with asCurrentUser when specified', async () => { + const abortController = new AbortController(); + const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + const abortableSearchClient = createAbortableEsClientFactory({ + scopedClusterClient, + abortController, + }); + + await abortableSearchClient.asCurrentUser.search(esQuery); + expect(scopedClusterClient.asCurrentUser.search).toHaveBeenCalledWith(esQuery, { + signal: abortController.signal, + }); + expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled(); + }); + + test('uses search options when specified', async () => { + const abortController = new AbortController(); + const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + const abortableSearchClient = createAbortableEsClientFactory({ + scopedClusterClient, + abortController, + }); + + await abortableSearchClient.asInternalUser.search(esQuery, { ignore: [404] }); + expect(scopedClusterClient.asInternalUser.search).toHaveBeenCalledWith(esQuery, { + ignore: [404], + signal: abortController.signal, + }); + expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled(); + }); + + test('re-throws error when search throws error', async () => { + const abortController = new AbortController(); + const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + scopedClusterClient.asInternalUser.search.mockRejectedValueOnce( + new Error('something went wrong!') + ); + const abortableSearchClient = createAbortableEsClientFactory({ + scopedClusterClient, + abortController, + }); + + await expect( + abortableSearchClient.asInternalUser.search + ).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`); + }); + + test('throws error when search throws abort error', async () => { + const abortController = new AbortController(); + abortController.abort(); + const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient(); + scopedClusterClient.asInternalUser.search.mockRejectedValueOnce( + new Error('Request has been aborted by the user') + ); + const abortableSearchClient = createAbortableEsClientFactory({ + scopedClusterClient, + abortController, + }); + + await expect( + abortableSearchClient.asInternalUser.search + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"Search has been aborted due to cancelled execution"` + ); + }); +}); diff --git a/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.ts b/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.ts new file mode 100644 index 000000000000000..9f47440ec86ddd9 --- /dev/null +++ b/x-pack/plugins/alerting/server/lib/create_abortable_es_client_factory.ts @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TransportRequestOptions, TransportResult } from '@elastic/elasticsearch'; +import { SearchResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { IScopedClusterClient } from 'src/core/server'; +import type { ESSearchRequest } from 'src/core/types/elasticsearch'; + +export interface IAbortableEsClient { + search: ( + query: ESSearchRequest, + options?: TransportRequestOptions + ) => Promise, unknown>>; +} + +export interface IAbortableClusterClient { + readonly asInternalUser: IAbortableEsClient; + readonly asCurrentUser: IAbortableEsClient; +} +export interface CreateAbortableEsClientFactoryOpts { + scopedClusterClient: IScopedClusterClient; + abortController: AbortController; +} + +export function createAbortableEsClientFactory(opts: CreateAbortableEsClientFactoryOpts) { + const { scopedClusterClient, abortController } = opts; + return { + asInternalUser: { + search: async (query: ESSearchRequest, options?: TransportRequestOptions) => { + try { + const searchOptions = options ?? {}; + return await scopedClusterClient.asInternalUser.search(query, { + ...searchOptions, + signal: abortController.signal, + }); + } catch (e) { + if (abortController.signal.aborted) { + throw new Error('Search has been aborted due to cancelled execution'); + } + throw e; + } + }, + }, + asCurrentUser: { + search: async (query: ESSearchRequest, options?: TransportRequestOptions) => { + try { + const searchOptions = options ?? {}; + return await scopedClusterClient.asCurrentUser.search(query, { + ...searchOptions, + signal: abortController.signal, + }); + } catch (e) { + if (abortController.signal.aborted) { + throw new Error('Search has been aborted due to cancelled execution'); + } + throw e; + } + }, + }, + }; +} diff --git a/x-pack/plugins/alerting/server/mocks.ts b/x-pack/plugins/alerting/server/mocks.ts index f23dbf05449adb6..c4702f796ad8e1b 100644 --- a/x-pack/plugins/alerting/server/mocks.ts +++ b/x-pack/plugins/alerting/server/mocks.ts @@ -63,6 +63,21 @@ const createAlertInstanceFactoryMock = < return mock as unknown as AlertInstanceMock; }; +const createAbortableSearchClientMock = () => { + const mock = { + search: jest.fn(), + }; + + return mock; +}; + +const createAbortableSearchServiceMock = () => { + return { + asInternalUser: createAbortableSearchClientMock(), + asCurrentUser: createAbortableSearchClientMock(), + }; +}; + const createAlertServicesMock = < InstanceState extends AlertInstanceState = AlertInstanceState, InstanceContext extends AlertInstanceContext = AlertInstanceContext @@ -75,6 +90,8 @@ const createAlertServicesMock = < savedObjectsClient: savedObjectsClientMock.create(), scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(), shouldWriteAlerts: () => true, + shouldStopExecution: () => true, + search: createAbortableSearchServiceMock(), }; }; export type AlertServicesMock = ReturnType; diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 91c9683b948a0a8..9f7d71143152a47 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -55,6 +55,7 @@ import { createAlertEventLogRecordObject, Event, } from '../lib/create_alert_event_log_record_object'; +import { createAbortableEsClientFactory } from '../lib/create_abortable_es_client_factory'; const FALLBACK_RETRY_INTERVAL = '5m'; @@ -93,6 +94,7 @@ export class TaskRunner< RecoveryActionGroupId >; private readonly ruleTypeRegistry: RuleTypeRegistry; + private searchAbortController: AbortController; private cancelled: boolean; constructor( @@ -114,6 +116,7 @@ export class TaskRunner< this.ruleName = null; this.taskInstance = taskInstanceToAlertTaskInstance(taskInstance); this.ruleTypeRegistry = context.ruleTypeRegistry; + this.searchAbortController = new AbortController(); this.cancelled = false; } @@ -321,6 +324,11 @@ export class TaskRunner< WithoutReservedActionGroups >(alerts), shouldWriteAlerts: () => this.shouldLogAndScheduleActionsForAlerts(), + shouldStopExecution: () => this.cancelled, + search: createAbortableEsClientFactory({ + scopedClusterClient: services.scopedClusterClient, + abortController: this.searchAbortController, + }), }, params, state: alertTypeState as State, @@ -726,6 +734,11 @@ export class TaskRunner< `Cancelling rule type ${this.ruleType.id} with id ${ruleId} - execution exceeded rule type timeout of ${this.ruleType.ruleTaskTimeout}` ); + this.logger.debug( + `Aborting any in-progress ES searches for rule type ${this.ruleType.id} with id ${ruleId}` + ); + this.searchAbortController.abort(); + const eventLogger = this.context.eventLogger; const event: IEvent = { event: { diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts b/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts index 1f5730395e79d8f..3e3c3351a8e6755 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts @@ -191,6 +191,12 @@ describe('Task Runner Cancel', () => { await taskRunner.cancel(); await promise; + const logger = taskRunnerFactoryInitializerParams.logger; + expect(logger.debug).toHaveBeenNthCalledWith( + 3, + `Aborting any in-progress ES searches for rule type test with id 1` + ); + const eventLogger = taskRunnerFactoryInitializerParams.eventLogger; // execute-start event, timeout event and then an execute event because rule executors are not cancelling anything yet expect(eventLogger.logEvent).toHaveBeenCalledTimes(3); @@ -394,7 +400,7 @@ describe('Task Runner Cancel', () => { await promise; const logger = taskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(6); + expect(logger.debug).toHaveBeenCalledTimes(7); expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -402,18 +408,22 @@ describe('Task Runner Cancel', () => { ); expect(logger.debug).nthCalledWith( 3, - `Updating rule task for test rule with id 1 - execution error due to timeout` + 'Aborting any in-progress ES searches for rule type test with id 1' ); expect(logger.debug).nthCalledWith( 4, - `rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` + `Updating rule task for test rule with id 1 - execution error due to timeout` ); expect(logger.debug).nthCalledWith( 5, - `no scheduling of actions for rule test:1: 'rule-name': rule execution has been cancelled.` + `rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` ); expect(logger.debug).nthCalledWith( 6, + `no scheduling of actions for rule test:1: 'rule-name': rule execution has been cancelled.` + ); + expect(logger.debug).nthCalledWith( + 7, 'ruleExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}' ); @@ -511,7 +521,7 @@ describe('Task Runner Cancel', () => { function testActionsExecute() { const logger = taskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(5); + expect(logger.debug).toHaveBeenCalledTimes(6); expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -519,14 +529,18 @@ describe('Task Runner Cancel', () => { ); expect(logger.debug).nthCalledWith( 3, - `Updating rule task for test rule with id 1 - execution error due to timeout` + 'Aborting any in-progress ES searches for rule type test with id 1' ); expect(logger.debug).nthCalledWith( 4, - `rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` + `Updating rule task for test rule with id 1 - execution error due to timeout` ); expect(logger.debug).nthCalledWith( 5, + `rule test:1: 'rule-name' has 1 active alerts: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` + ); + expect(logger.debug).nthCalledWith( + 6, 'ruleExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}' ); diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index 6671810a0b73815..f87a2889fe6fe46 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -35,6 +35,7 @@ import { SanitizedRuleConfig, } from '../common'; import { LicenseType } from '../../licensing/server'; +import { IAbortableClusterClient } from './lib/create_abortable_es_client_factory'; export type WithoutQueryAndParams = Pick>; export type GetServicesFunction = (request: KibanaRequest) => Services; @@ -76,6 +77,8 @@ export interface AlertServices< id: string ) => PublicAlertInstance; shouldWriteAlerts: () => boolean; + shouldStopExecution: () => boolean; + search: IAbortableClusterClient; } export interface AlertExecutorOptions< diff --git a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts index 7aa7dcd9620fed6..f2fb2a803ec1494 100644 --- a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts +++ b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts @@ -111,6 +111,8 @@ function createRule(shouldWriteAlerts: boolean = true) { savedObjectsClient: {} as any, scopedClusterClient: {} as any, shouldWriteAlerts: () => shouldWriteAlerts, + shouldStopExecution: () => false, + search: {} as any, }, spaceId: 'spaceId', state, diff --git a/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts b/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts index 288830f4b3804ea..10cce043fe3fdad 100644 --- a/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts +++ b/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts @@ -72,6 +72,8 @@ export const createDefaultAlertExecutorOptions = < savedObjectsClient: savedObjectsClientMock.create(), scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(), shouldWriteAlerts: () => shouldWriteAlerts, + shouldStopExecution: () => false, + search: alertsMock.createAlertServices().search, }, state, updatedBy: null, diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts index 00475655646f812..033a5ecd84b9bba 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts @@ -171,7 +171,10 @@ export const previewRulesRoute = async ( rule, services: { shouldWriteAlerts, + shouldStopExecution: () => false, alertInstanceFactory, + // Just use es client always for preview + search: context.core.elasticsearch.client, savedObjectsClient: context.core.savedObjects.client, scopedClusterClient: context.core.elasticsearch.client, }, diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts index b9e523efbaa29de..1a93ee375d9b60b 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts @@ -511,6 +511,64 @@ function getLongRunningPatternRuleType(cancelAlertsOnRuleTimeout: boolean = true return result; } +function getCancellableRuleType() { + const paramsSchema = schema.object({ + doLongSearch: schema.boolean(), + doLongPostProcessing: schema.boolean(), + }); + type ParamsType = TypeOf; + const result: RuleType = { + id: 'test.cancellableRule', + name: 'Test: Rule That Implements Cancellation', + actionGroups: [{ id: 'default', name: 'Default' }], + producer: 'alertsFixture', + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + ruleTaskTimeout: '3s', + async executor(ruleExecutorOptions) { + const { services, params } = ruleExecutorOptions; + const doLongSearch = params.doLongSearch; + const doLongPostProcessing = params.doLongPostProcessing; + + const aggs = doLongSearch + ? { + delay: { + shard_delay: { + value: '10s', + }, + }, + } + : {}; + + const query = { + index: ES_TEST_INDEX_NAME, + body: { + query: { + bool: { + filter: { + match_all: {}, + }, + }, + }, + ...(aggs ? { aggs } : {}), + }, + }; + + await services.search.asCurrentUser.search(query as any); + + if (doLongPostProcessing) { + await new Promise((resolve) => setTimeout(resolve, 10000)); + } + + if (services.shouldStopExecution()) { + throw new Error('execution short circuited!'); + } + }, + }; + return result; +} + export function defineAlertTypes( core: CoreSetup, { alerting }: Pick @@ -626,4 +684,5 @@ export function defineAlertTypes( alerting.registerType(exampleAlwaysFiringAlertType); alerting.registerType(getLongRunningPatternRuleType()); alerting.registerType(getLongRunningPatternRuleType(false)); + alerting.registerType(getCancellableRuleType()); } diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/index.ts new file mode 100644 index 000000000000000..a0b500ef6902408 --- /dev/null +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/index.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FtrProviderContext } from '../../../../../common/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function alertingTests({ loadTestFile }: FtrProviderContext) { + describe('cancellable_rule', () => { + loadTestFile(require.resolve('./rule')); + }); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/rule.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/rule.ts new file mode 100644 index 000000000000000..836e115352624e3 --- /dev/null +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/cancellable/rule.ts @@ -0,0 +1,229 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; + +import { Spaces } from '../../../../scenarios'; +import { FtrProviderContext } from '../../../../../common/ftr_provider_context'; +import { + ESTestIndexTool, + getUrlPrefix, + ObjectRemover, + getEventLog, +} from '../../../../../common/lib'; +import { createEsDocuments } from '../lib/create_test_data'; + +const RULE_INTERVAL_SECONDS = 6; +const RULE_INTERVALS_TO_WRITE = 5; +const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000; +const ES_GROUPS_TO_WRITE = 3; + +// eslint-disable-next-line import/no-default-export +export default function ruleTests({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const retry = getService('retry'); + const es = getService('es'); + const log = getService('log'); + const esTestIndexTool = new ESTestIndexTool(es, retry); + + describe('rule that implements cancellation services', () => { + let endDate: string; + const objectRemover = new ObjectRemover(supertest); + + beforeEach(async () => { + await esTestIndexTool.destroy(); + await esTestIndexTool.setup(); + + // write documents in the future, figure out the end date + const endDateMillis = Date.now() + (RULE_INTERVALS_TO_WRITE - 1) * RULE_INTERVAL_MILLIS; + endDate = new Date(endDateMillis).toISOString(); + }); + + afterEach(async () => { + await objectRemover.removeAll(); + }); + + before(async function () { + const body = await es.info(); + if (!body.version.number.includes('SNAPSHOT')) { + log.debug('Skipping because this build does not have the required shard_delay agg'); + this.skip(); + return; + } + }); + + it('runs successfully if it does not timeout', async () => { + await createEsDocuments( + es, + esTestIndexTool, + endDate, + RULE_INTERVALS_TO_WRITE, + RULE_INTERVAL_MILLIS, + ES_GROUPS_TO_WRITE + ); + const ruleId = await createRule({ + name: 'normal rule', + ruleTypeId: 'test.cancellableRule', + doLongSearch: false, + doLongPostProcessing: false, + }); + + // get the events we're expecting + const events = await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([['execute', { gte: 1 }]]), + }); + }); + + // execute event should have error status + events.filter((event) => event?.event?.action === 'execute'); + expect(events[0]?.event?.outcome).to.eql('success'); + expect(events[0]?.kibana?.alerting?.status).to.eql('ok'); + expect(events[0]?.error?.message).to.be(undefined); + + // rule execution status should be in error with reason timeout + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('ok'); + expect(rule.execution_status.error).to.be(undefined); + }); + + it('throws an error if search runs longer than rule timeout', async () => { + await createEsDocuments( + es, + esTestIndexTool, + endDate, + RULE_INTERVALS_TO_WRITE, + RULE_INTERVAL_MILLIS, + ES_GROUPS_TO_WRITE + ); + const ruleId = await createRule({ + name: 'rule that takes a long time to query ES', + ruleTypeId: 'test.cancellableRule', + doLongSearch: true, + doLongPostProcessing: false, + }); + + // get the events we're expecting + const events = await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([['execute', { gte: 1 }]]), + }); + }); + + // execute event should have error status + events.filter((event) => event?.event?.action === 'execute'); + expect(events[0]?.event?.outcome).to.eql('failure'); + expect(events[0]?.kibana?.alerting?.status).to.eql('error'); + expect(events[0]?.error?.message).to.eql( + 'Search has been aborted due to cancelled execution' + ); + + // rule execution status should be in error with reason timeout + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('error'); + expect(rule.execution_status.error.message).to.eql( + `test.cancellableRule:${ruleId}: execution cancelled due to timeout - exceeded rule type timeout of 3s` + ); + expect(rule.execution_status.error.reason).to.eql('timeout'); + }); + + it('throws an error if execution is short circuited', async () => { + await createEsDocuments( + es, + esTestIndexTool, + endDate, + RULE_INTERVALS_TO_WRITE, + RULE_INTERVAL_MILLIS, + ES_GROUPS_TO_WRITE + ); + const ruleId = await createRule({ + name: 'rule that takes a long time to post process', + ruleTypeId: 'test.cancellableRule', + doLongSearch: false, + doLongPostProcessing: true, + }); + + // get the events we're expecting + const events = await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([['execute', { gte: 1 }]]), + }); + }); + + // execute event should have error status + events.filter((event) => event?.event?.action === 'execute'); + expect(events[0]?.event?.outcome).to.eql('failure'); + expect(events[0]?.kibana?.alerting?.status).to.eql('error'); + expect(events[0]?.error?.message).to.eql('execution short circuited!'); + + // rule execution status should be in error with reason timeout + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('error'); + expect(rule.execution_status.error.message).to.eql( + `test.cancellableRule:${ruleId}: execution cancelled due to timeout - exceeded rule type timeout of 3s` + ); + expect(rule.execution_status.error.reason).to.eql('timeout'); + }); + + interface CreateRuleParams { + name: string; + ruleTypeId: string; + doLongSearch: boolean; + doLongPostProcessing: boolean; + } + + async function createRule(params: CreateRuleParams): Promise { + const { status, body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send({ + name: params.name, + consumer: 'alerts', + enabled: true, + rule_type_id: params.ruleTypeId, + schedule: { interval: `${RULE_INTERVAL_SECONDS}s` }, + actions: [], + notify_when: 'onActiveAlert', + params: { + doLongSearch: params.doLongSearch, + doLongPostProcessing: params.doLongPostProcessing, + }, + }); + + expect(status).to.be(200); + + const ruleId = createdRule.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + return ruleId; + } + }); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/alert.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/alert.ts index bd2742ca137be64..2a2fdbcf9e4d23a 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/alert.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/alert.ts @@ -15,7 +15,7 @@ import { getUrlPrefix, ObjectRemover, } from '../../../../../common/lib'; -import { createEsDocuments } from './create_test_data'; +import { createEsDocuments } from '../lib/create_test_data'; const ALERT_TYPE_ID = '.es-query'; const ACTION_TYPE_ID = '.index'; diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts index 14748737f0d53ba..90d8f2a9957981c 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts @@ -13,5 +13,6 @@ export default function alertingTests({ loadTestFile }: FtrProviderContext) { loadTestFile(require.resolve('./index_threshold')); loadTestFile(require.resolve('./es_query')); loadTestFile(require.resolve('./long_running')); + loadTestFile(require.resolve('./cancellable')); }); } diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/create_test_data.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/lib/create_test_data.ts similarity index 100% rename from x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/es_query/create_test_data.ts rename to x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/lib/create_test_data.ts