diff --git a/package.json b/package.json index 560afc70188402..c7fe81284ea19e 100644 --- a/package.json +++ b/package.json @@ -903,6 +903,7 @@ "getopts": "^2.2.5", "getos": "^3.1.0", "globby": "^11.1.0", + "gpt-tokenizer": "^2.1.2", "handlebars": "4.7.7", "he": "^1.2.0", "history": "^4.9.0", diff --git a/packages/kbn-search-connectors/types/connectors.ts b/packages/kbn-search-connectors/types/connectors.ts index 8f649c15f348f6..4df4a0394546e3 100644 --- a/packages/kbn-search-connectors/types/connectors.ts +++ b/packages/kbn-search-connectors/types/connectors.ts @@ -68,7 +68,7 @@ export type ConnectorConfiguration = Record< > & { extract_full_html?: { label: string; value: boolean }; // This only exists for Crawler use_document_level_security?: ConnectorConfigProperties; - use_text_extraction_service?: ConnectorConfigProperties; // This only exists for SharePoint Online + use_text_extraction_service?: ConnectorConfigProperties; }; export interface ConnectorScheduling { diff --git a/packages/kbn-search-connectors/types/native_connectors.ts b/packages/kbn-search-connectors/types/native_connectors.ts index 399ce74892de53..c618aa8aa70d42 100644 --- a/packages/kbn-search-connectors/types/native_connectors.ts +++ b/packages/kbn-search-connectors/types/native_connectors.ts @@ -444,6 +444,21 @@ export const NATIVE_CONNECTOR_DEFINITIONS: Record void; setIsPopoverOpen: (flag: boolean) => void; onErrorClick: (error: MonacoError) => void; }) { @@ -89,7 +87,6 @@ export function ErrorsWarningsPopover({ } `} onClick={() => { - refreshErrors(); setIsPopoverOpen(!isPopoverOpen); }} > @@ -184,7 +181,6 @@ export const EditorFooter = memo(function EditorFooter({ isPopoverOpen={isPopoverOpen} items={errors} type="error" - refreshErrors={refreshErrors} setIsPopoverOpen={setIsPopoverOpen} onErrorClick={onErrorClick} /> @@ -194,7 +190,6 @@ export const EditorFooter = memo(function EditorFooter({ isPopoverOpen={isPopoverOpen} items={warning} type="warning" - refreshErrors={refreshErrors} setIsPopoverOpen={setIsPopoverOpen} onErrorClick={onErrorClick} /> diff --git a/src/plugins/data/server/search/strategies/esql_search/esql_search_strategy.ts b/src/plugins/data/server/search/strategies/esql_search/esql_search_strategy.ts index 7f3f6f521853d4..2af032826189fb 100644 --- a/src/plugins/data/server/search/strategies/esql_search/esql_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/esql_search/esql_search_strategy.ts @@ -11,6 +11,8 @@ import type { Logger } from '@kbn/core/server'; import { getKbnServerError, KbnServerError } from '@kbn/kibana-utils-plugin/server'; import type { ISearchStrategy } from '../../types'; +const ES_TIMEOUT_IN_MS = 120000; + export const esqlSearchStrategyProvider = ( logger: Logger, useInternalUser: boolean = false @@ -23,6 +25,17 @@ export const esqlSearchStrategyProvider = ( * @returns `Observable>` */ search: (request, { abortSignal, ...options }, { esClient, uiSettingsClient }) => { + const abortController = new AbortController(); + // We found out that there are cases where we are not aborting correctly + // For this reasons we want to manually cancel he abort signal after 2 mins + + abortSignal?.addEventListener('abort', () => { + abortController.abort(); + }); + + // Also abort after two mins + setTimeout(() => abortController.abort(), ES_TIMEOUT_IN_MS); + // Only default index pattern type is supported here. // See ese for other type support. if (request.indexType) { @@ -41,8 +54,10 @@ export const esqlSearchStrategyProvider = ( }, }, { - signal: abortSignal, + signal: abortController.signal, meta: true, + // we don't want the ES client to retry (default value is 3) + maxRetries: 0, } ); return { diff --git a/src/plugins/unified_histogram/public/chart/histogram.tsx b/src/plugins/unified_histogram/public/chart/histogram.tsx index 1d91b2a505174c..956f4ef86f2a5f 100644 --- a/src/plugins/unified_histogram/public/chart/histogram.tsx +++ b/src/plugins/unified_histogram/public/chart/histogram.tsx @@ -107,6 +107,7 @@ export function Histogram({ timeRange: getTimeRange(), timeInterval, isPlainRecord, + timeField: dataView.timeFieldName, }); const chartRef = useRef(null); const { height: containerHeight, width: containerWidth } = useResizeObserver(chartRef.current); diff --git a/src/plugins/unified_histogram/public/chart/hooks/use_time_range.test.tsx b/src/plugins/unified_histogram/public/chart/hooks/use_time_range.test.tsx index 176d69d984290e..1e86bf5d9614e7 100644 --- a/src/plugins/unified_histogram/public/chart/hooks/use_time_range.test.tsx +++ b/src/plugins/unified_histogram/public/chart/hooks/use_time_range.test.tsx @@ -245,6 +245,7 @@ describe('useTimeRange', () => { timeRange, timeInterval, isPlainRecord: true, + timeField: '@timestamp', }) ); expect(result.current.timeRangeDisplay).toMatchInlineSnapshot(` @@ -267,4 +268,17 @@ describe('useTimeRange', () => { `); }); + + it('should not render a text for text based languages when not timeField is provided', () => { + const { result } = renderHook(() => + useTimeRange({ + uiSettings, + bucketInterval, + timeRange, + timeInterval, + isPlainRecord: true, + }) + ); + expect(result.current.timeRangeDisplay).toBeNull(); + }); }); diff --git a/src/plugins/unified_histogram/public/chart/hooks/use_time_range.tsx b/src/plugins/unified_histogram/public/chart/hooks/use_time_range.tsx index 089df5124b60a7..791d332a3a89fb 100644 --- a/src/plugins/unified_histogram/public/chart/hooks/use_time_range.tsx +++ b/src/plugins/unified_histogram/public/chart/hooks/use_time_range.tsx @@ -21,12 +21,14 @@ export const useTimeRange = ({ timeRange: { from, to }, timeInterval, isPlainRecord, + timeField, }: { uiSettings: IUiSettingsClient; bucketInterval?: UnifiedHistogramBucketInterval; timeRange: TimeRange; timeInterval?: string; isPlainRecord?: boolean; + timeField?: string; }) => { const dateFormat = useMemo(() => uiSettings.get('dateFormat'), [uiSettings]); @@ -44,6 +46,10 @@ export const useTimeRange = ({ ); const timeRangeText = useMemo(() => { + if (!timeField && isPlainRecord) { + return ''; + } + const timeRange = { from: dateMath.parse(from), to: dateMath.parse(to, { roundUp: true }), @@ -70,18 +76,18 @@ export const useTimeRange = ({ }); return `${toMoment(timeRange.from)} - ${toMoment(timeRange.to)} ${intervalText}`; - }, [bucketInterval?.description, from, isPlainRecord, timeInterval, to, toMoment]); + }, [bucketInterval?.description, from, isPlainRecord, timeField, timeInterval, to, toMoment]); const { euiTheme } = useEuiTheme(); const timeRangeCss = css` padding: 0 ${euiTheme.size.s} 0 ${euiTheme.size.s}; `; - let timeRangeDisplay = ( + let timeRangeDisplay = timeRangeText ? ( {timeRangeText} - ); + ) : null; if (bucketInterval?.scaled) { const toolTipTitle = i18n.translate('unifiedHistogram.timeIntervalWithValueWarning', { diff --git a/x-pack/plugins/actions/server/lib/action_executor.test.ts b/x-pack/plugins/actions/server/lib/action_executor.test.ts index a966605bd3a4f9..9b2fd5e987f6ca 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.test.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.test.ts @@ -20,6 +20,8 @@ import { asSavedObjectExecutionSource, } from './action_execution_source'; import { securityMock } from '@kbn/security-plugin/server/mocks'; +import { finished } from 'stream/promises'; +import { PassThrough } from 'stream'; const actionExecutor = new ActionExecutor({ isESOCanEncrypt: true }); const services = actionsMock.createServices(); @@ -1837,6 +1839,102 @@ test('writes usage data to event log for OpenAI events', async () => { }); }); +test('writes usage data to event log for streaming OpenAI events', async () => { + const executorMock = setupActionExecutorMock('.gen-ai', { + params: { schema: schema.any() }, + config: { schema: schema.any() }, + secrets: { schema: schema.any() }, + }); + + const stream = new PassThrough(); + + executorMock.mockResolvedValue({ + actionId: '1', + status: 'ok', + // @ts-ignore + data: stream, + }); + + await actionExecutor.execute({ + ...executeParams, + params: { + subActionParams: { + body: JSON.stringify({ + messages: [ + { + role: 'system', + content: 'System message', + }, + { + role: 'user', + content: 'User message', + }, + ], + }), + }, + }, + }); + + expect(eventLogger.logEvent).toHaveBeenCalledTimes(1); + stream.write( + `data: ${JSON.stringify({ + object: 'chat.completion.chunk', + choices: [{ delta: { content: 'Single' } }], + })}\n` + ); + stream.write(`data: [DONE]`); + + stream.end(); + + await finished(stream); + + await new Promise(process.nextTick); + + expect(eventLogger.logEvent).toHaveBeenCalledTimes(2); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(2, { + event: { + action: 'execute', + kind: 'action', + outcome: 'success', + }, + kibana: { + action: { + execution: { + uuid: '2', + gen_ai: { + usage: { + completion_tokens: 5, + prompt_tokens: 30, + total_tokens: 35, + }, + }, + }, + name: 'action-1', + id: '1', + }, + alert: { + rule: { + execution: { + uuid: '123abc', + }, + }, + }, + saved_objects: [ + { + id: '1', + namespace: 'some-namespace', + rel: 'primary', + type: 'action', + type_id: '.gen-ai', + }, + ], + space_ids: ['some-namespace'], + }, + message: 'action executed: .gen-ai:1: action-1', + user: { name: 'coolguy', id: '123' }, + }); +}); + test('does not fetches actionInfo if passed as param', async () => { const actionType: jest.Mocked = { id: 'test', @@ -1898,13 +1996,16 @@ test('does not fetches actionInfo if passed as param', async () => { ); }); -function setupActionExecutorMock(actionTypeId = 'test') { +function setupActionExecutorMock( + actionTypeId = 'test', + validationOverride?: ActionType['validate'] +) { const actionType: jest.Mocked = { id: 'test', name: 'Test', minimumLicenseRequired: 'basic', supportedFeatureIds: ['alerting'], - validate: { + validate: validationOverride || { config: { schema: schema.object({ bar: schema.boolean() }) }, secrets: { schema: schema.object({ baz: schema.boolean() }) }, params: { schema: schema.object({ foo: schema.boolean() }) }, diff --git a/x-pack/plugins/actions/server/lib/action_executor.ts b/x-pack/plugins/actions/server/lib/action_executor.ts index d3d57de4cda4ec..9cd70d4c7bf910 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.ts @@ -13,6 +13,7 @@ import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin import { SpacesServiceStart } from '@kbn/spaces-plugin/server'; import { IEventLogger, SAVED_OBJECT_REL_PRIMARY } from '@kbn/event-log-plugin/server'; import { SecurityPluginStart } from '@kbn/security-plugin/server'; +import { PassThrough, Readable } from 'stream'; import { validateParams, validateConfig, @@ -37,6 +38,7 @@ import { RelatedSavedObjects } from './related_saved_objects'; import { createActionEventLogRecordObject } from './create_action_event_log_record_object'; import { ActionExecutionError, ActionExecutionErrorReason } from './errors/action_execution_error'; import type { ActionsAuthorization } from '../authorization/actions_authorization'; +import { getTokenCountFromOpenAIStream } from './get_token_count_from_openai_stream'; // 1,000,000 nanoseconds in 1 millisecond const Millis2Nanos = 1000 * 1000; @@ -276,8 +278,6 @@ export class ActionExecutor { } } - eventLogger.stopTiming(event); - // allow null-ish return to indicate success const result = rawResult || { actionId, @@ -286,6 +286,48 @@ export class ActionExecutor { event.event = event.event || {}; + const { error, ...resultWithoutError } = result; + + function completeEventLogging() { + eventLogger.stopTiming(event); + + const currentUser = security?.authc.getCurrentUser(request); + + event.user = event.user || {}; + event.user.name = currentUser?.username; + event.user.id = currentUser?.profile_uid; + + if (result.status === 'ok') { + span?.setOutcome('success'); + event.event!.outcome = 'success'; + event.message = `action executed: ${actionLabel}`; + } else if (result.status === 'error') { + span?.setOutcome('failure'); + event.event!.outcome = 'failure'; + event.message = `action execution failure: ${actionLabel}`; + event.error = event.error || {}; + event.error.message = actionErrorToMessage(result); + if (result.error) { + logger.error(result.error, { + tags: [actionTypeId, actionId, 'action-run-failed'], + error: { stack_trace: result.error.stack }, + }); + } + logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`); + } else { + span?.setOutcome('failure'); + event.event!.outcome = 'failure'; + event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`; + event.error = event.error || {}; + event.error.message = 'action execution returned unexpected result'; + logger.warn( + `action execution failure: ${actionLabel}: returned unexpected result "${result.status}"` + ); + } + + eventLogger.logEvent(event); + } + // start openai extension // add event.kibana.action.execution.openai to event log when OpenAI Connector is executed if (result.status === 'ok' && actionTypeId === '.gen-ai') { @@ -310,45 +352,34 @@ export class ActionExecutor { }, }, }; - } - // end openai extension - const currentUser = security?.authc.getCurrentUser(request); - - event.user = event.user || {}; - event.user.name = currentUser?.username; - event.user.id = currentUser?.profile_uid; - - if (result.status === 'ok') { - span?.setOutcome('success'); - event.event.outcome = 'success'; - event.message = `action executed: ${actionLabel}`; - } else if (result.status === 'error') { - span?.setOutcome('failure'); - event.event.outcome = 'failure'; - event.message = `action execution failure: ${actionLabel}`; - event.error = event.error || {}; - event.error.message = actionErrorToMessage(result); - if (result.error) { - logger.error(result.error, { - tags: [actionTypeId, actionId, 'action-run-failed'], - error: { stack_trace: result.error.stack }, - }); + if (result.data instanceof Readable) { + getTokenCountFromOpenAIStream({ + responseStream: result.data.pipe(new PassThrough()), + body: (validatedParams as { subActionParams: { body: string } }).subActionParams.body, + }) + .then(({ total, prompt, completion }) => { + event.kibana!.action!.execution!.gen_ai!.usage = { + total_tokens: total, + prompt_tokens: prompt, + completion_tokens: completion, + }; + }) + .catch((err) => { + logger.error('Failed to calculate tokens from streaming response'); + logger.error(err); + }) + .finally(() => { + completeEventLogging(); + }); + + return resultWithoutError; } - logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`); - } else { - span?.setOutcome('failure'); - event.event.outcome = 'failure'; - event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`; - event.error = event.error || {}; - event.error.message = 'action execution returned unexpected result'; - logger.warn( - `action execution failure: ${actionLabel}: returned unexpected result "${result.status}"` - ); } + // end openai extension + + completeEventLogging(); - eventLogger.logEvent(event); - const { error, ...resultWithoutError } = result; return resultWithoutError; } ); diff --git a/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.test.ts b/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.test.ts new file mode 100644 index 00000000000000..080b7cb5f972ff --- /dev/null +++ b/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.test.ts @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { Transform } from 'stream'; +import { getTokenCountFromOpenAIStream } from './get_token_count_from_openai_stream'; + +interface StreamMock { + write: (data: string) => void; + fail: () => void; + complete: () => void; + transform: Transform; +} + +function createStreamMock(): StreamMock { + const transform: Transform = new Transform({}); + + return { + write: (data: string) => { + transform.push(`${data}\n`); + }, + fail: () => { + transform.emit('error', new Error('Stream failed')); + transform.end(); + }, + transform, + complete: () => { + transform.end(); + }, + }; +} + +describe('getTokenCountFromOpenAIStream', () => { + let tokens: Awaited>; + let stream: StreamMock; + const body = { + messages: [ + { + role: 'system', + content: 'This is a system message', + }, + { + role: 'user', + content: 'This is a user message', + }, + ], + }; + + const chunk = { + object: 'chat.completion.chunk', + choices: [ + { + delta: { + content: 'Single', + }, + }, + ], + }; + + const PROMPT_TOKEN_COUNT = 36; + const COMPLETION_TOKEN_COUNT = 5; + + beforeEach(() => { + stream = createStreamMock(); + stream.write(`data: ${JSON.stringify(chunk)}`); + }); + + describe('when a stream completes', () => { + beforeEach(async () => { + stream.write('data: [DONE]'); + stream.complete(); + }); + + describe('without function tokens', () => { + beforeEach(async () => { + tokens = await getTokenCountFromOpenAIStream({ + responseStream: stream.transform, + body: JSON.stringify(body), + }); + }); + + it('counts the prompt tokens', () => { + expect(tokens.prompt).toBe(PROMPT_TOKEN_COUNT); + expect(tokens.completion).toBe(COMPLETION_TOKEN_COUNT); + expect(tokens.total).toBe(PROMPT_TOKEN_COUNT + COMPLETION_TOKEN_COUNT); + }); + }); + + describe('with function tokens', () => { + beforeEach(async () => { + tokens = await getTokenCountFromOpenAIStream({ + responseStream: stream.transform, + body: JSON.stringify({ + ...body, + functions: [ + { + name: 'my_function', + description: 'My function description', + parameters: { + type: 'object', + properties: { + my_property: { + type: 'boolean', + description: 'My function property', + }, + }, + }, + }, + ], + }), + }); + }); + + it('counts the function tokens', () => { + expect(tokens.prompt).toBeGreaterThan(PROMPT_TOKEN_COUNT); + }); + }); + }); + + describe('when a stream fails', () => { + it('resolves the promise with the correct prompt tokens', async () => { + const tokenPromise = getTokenCountFromOpenAIStream({ + responseStream: stream.transform, + body: JSON.stringify(body), + }); + + stream.fail(); + + await expect(tokenPromise).resolves.toEqual({ + prompt: PROMPT_TOKEN_COUNT, + total: PROMPT_TOKEN_COUNT + COMPLETION_TOKEN_COUNT, + completion: COMPLETION_TOKEN_COUNT, + }); + }); + }); +}); diff --git a/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.ts b/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.ts new file mode 100644 index 00000000000000..74c89f716171e4 --- /dev/null +++ b/x-pack/plugins/actions/server/lib/get_token_count_from_openai_stream.ts @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { encode } from 'gpt-tokenizer'; +import { isEmpty, omitBy } from 'lodash'; +import { Readable } from 'stream'; +import { finished } from 'stream/promises'; +import { CreateChatCompletionRequest } from 'openai'; + +export async function getTokenCountFromOpenAIStream({ + responseStream, + body, +}: { + responseStream: Readable; + body: string; +}): Promise<{ + total: number; + prompt: number; + completion: number; +}> { + const chatCompletionRequest = JSON.parse(body) as CreateChatCompletionRequest; + + // per https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb + const tokensFromMessages = encode( + chatCompletionRequest.messages + .map( + (msg) => + `<|start|>${msg.role}\n${msg.content}\n${ + msg.name + ? msg.name + : msg.function_call + ? msg.function_call.name + '\n' + msg.function_call.arguments + : '' + }<|end|>` + ) + .join('\n') + ).length; + + // this is an approximation. OpenAI cuts off a function schema + // at a certain level of nesting, so their token count might + // be lower than what we are calculating here. + + const tokensFromFunctions = chatCompletionRequest.functions + ? encode( + chatCompletionRequest.functions + ?.map( + (fn) => + `<|start|>${fn.name}\n${fn.description}\n${JSON.stringify(fn.parameters)}<|end|>` + ) + .join('\n') + ).length + : 0; + + const promptTokens = tokensFromMessages + tokensFromFunctions; + + let responseBody: string = ''; + + responseStream.on('data', (chunk: string) => { + responseBody += chunk.toString(); + }); + + try { + await finished(responseStream); + } catch { + // no need to handle this explicitly + } + + const response = responseBody + .split('\n') + .filter((line) => { + return line.startsWith('data: ') && !line.endsWith('[DONE]'); + }) + .map((line) => { + return JSON.parse(line.replace('data: ', '')); + }) + .filter( + ( + line + ): line is { + choices: Array<{ + delta: { content?: string; function_call?: { name?: string; arguments: string } }; + }>; + } => { + return 'object' in line && line.object === 'chat.completion.chunk'; + } + ) + .reduce( + (prev, line) => { + const msg = line.choices[0].delta!; + prev.content += msg.content || ''; + prev.function_call.name += msg.function_call?.name || ''; + prev.function_call.arguments += msg.function_call?.arguments || ''; + return prev; + }, + { content: '', function_call: { name: '', arguments: '' } } + ); + + const completionTokens = encode( + JSON.stringify( + omitBy( + { + content: response.content || undefined, + function_call: response.function_call.name ? response.function_call : undefined, + }, + isEmpty + ) + ) + ).length; + + return { + prompt: promptTokens, + completion: completionTokens, + total: promptTokens + completionTokens, + }; +} diff --git a/x-pack/plugins/actions/server/sub_action_framework/sub_action_connector.ts b/x-pack/plugins/actions/server/sub_action_framework/sub_action_connector.ts index e421083fd6177d..7d3c6e51e844e5 100644 --- a/x-pack/plugins/actions/server/sub_action_framework/sub_action_connector.ts +++ b/x-pack/plugins/actions/server/sub_action_framework/sub_action_connector.ts @@ -20,6 +20,7 @@ import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { finished } from 'stream/promises'; import { IncomingMessage } from 'http'; +import { PassThrough } from 'stream'; import { assertURL } from './helpers/validators'; import { ActionsConfigurationUtilities } from '../actions_config'; import { SubAction, SubActionRequestParams } from './types'; @@ -158,11 +159,13 @@ export abstract class SubActionConnector { try { const incomingMessage = error.response.data as IncomingMessage; - incomingMessage.on('data', (chunk) => { + const pt = incomingMessage.pipe(new PassThrough()); + + pt.on('data', (chunk) => { responseBody += chunk.toString(); }); - await finished(incomingMessage); + await finished(pt); error.response.data = JSON.parse(responseBody); } catch { diff --git a/x-pack/plugins/observability_ai_assistant/server/routes/chat/route.ts b/x-pack/plugins/observability_ai_assistant/server/routes/chat/route.ts index 19ebdcbaedc95e..90620156acf370 100644 --- a/x-pack/plugins/observability_ai_assistant/server/routes/chat/route.ts +++ b/x-pack/plugins/observability_ai_assistant/server/routes/chat/route.ts @@ -5,10 +5,10 @@ * 2.0. */ import { notImplemented } from '@hapi/boom'; -import { IncomingMessage } from 'http'; import * as t from 'io-ts'; import { toBooleanRt } from '@kbn/io-ts-utils'; import type { CreateChatCompletionResponse } from 'openai'; +import { Readable } from 'stream'; import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route'; import { messageRt } from '../runtime_types'; @@ -38,7 +38,7 @@ const chatRoute = createObservabilityAIAssistantServerRoute({ }), t.partial({ query: t.type({ stream: toBooleanRt }) }), ]), - handler: async (resources): Promise => { + handler: async (resources): Promise => { const { request, params, service } = resources; const client = await service.getClient({ request }); diff --git a/x-pack/plugins/observability_ai_assistant/server/service/client/index.ts b/x-pack/plugins/observability_ai_assistant/server/service/client/index.ts index 3a99e293cd5e2e..f0df01d87168d5 100644 --- a/x-pack/plugins/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/plugins/observability_ai_assistant/server/service/client/index.ts @@ -10,7 +10,6 @@ import type { ActionsClient } from '@kbn/actions-plugin/server'; import type { ElasticsearchClient } from '@kbn/core/server'; import type { Logger } from '@kbn/logging'; import type { PublicMethodsOf } from '@kbn/utility-types'; -import type { IncomingMessage } from 'http'; import { compact, isEmpty, merge, omit } from 'lodash'; import type { ChatCompletionFunctions, @@ -18,10 +17,11 @@ import type { CreateChatCompletionRequest, CreateChatCompletionResponse, } from 'openai'; +import { PassThrough, Readable } from 'stream'; import { v4 } from 'uuid'; import { - type CompatibleJSONSchema, MessageRole, + type CompatibleJSONSchema, type Conversation, type ConversationCreateRequest, type ConversationUpdateRequest, @@ -116,7 +116,7 @@ export class ObservabilityAIAssistantClient { functions?: Array<{ name: string; description: string; parameters: CompatibleJSONSchema }>; functionCall?: string; stream?: TStream; - }): Promise => { + }): Promise => { const messagesForOpenAI: ChatCompletionRequestMessage[] = compact( messages .filter((message) => message.message.content || message.message.function_call?.name) @@ -195,7 +195,11 @@ export class ObservabilityAIAssistantClient { throw internal(`${executeResult?.message} - ${executeResult?.serviceMessage}`); } - return executeResult.data as any; + const response = stream + ? ((executeResult.data as Readable).pipe(new PassThrough()) as Readable) + : (executeResult.data as CreateChatCompletionResponse); + + return response as any; }; find = async (options?: { query?: string }): Promise<{ conversations: Conversation[] }> => { diff --git a/x-pack/plugins/synthetics/e2e/synthetics/journeys/data_retention.journey.ts b/x-pack/plugins/synthetics/e2e/synthetics/journeys/data_retention.journey.ts index 9487fb62ccf950..1e0c00f42a92fd 100644 --- a/x-pack/plugins/synthetics/e2e/synthetics/journeys/data_retention.journey.ts +++ b/x-pack/plugins/synthetics/e2e/synthetics/journeys/data_retention.journey.ts @@ -52,7 +52,7 @@ journey(`DataRetentionPage`, async ({ page, params }) => { const screenshotChecks = await page.textContent( `tr:has-text("Browser Screenshots") span:has-text("KB")` ); - expect(Number(allChecks?.split('KB')[0])).toBeGreaterThan(450); + expect(Number(allChecks?.split('KB')[0])).toBeGreaterThan(400); expect(Number(browserChecks?.split('KB')[0])).toBeGreaterThan(50); expect(Number(networkChecks?.split('KB')[0])).toBeGreaterThan(300); expect(Number(screenshotChecks?.split('KB')[0])).toBeGreaterThan(25); diff --git a/x-pack/plugins/synthetics/e2e/synthetics/journeys/index.ts b/x-pack/plugins/synthetics/e2e/synthetics/journeys/index.ts index 40418a55e8d8eb..e38219a78ec95d 100644 --- a/x-pack/plugins/synthetics/e2e/synthetics/journeys/index.ts +++ b/x-pack/plugins/synthetics/e2e/synthetics/journeys/index.ts @@ -5,6 +5,7 @@ * 2.0. */ +export * from './data_retention.journey'; export * from './project_api_keys.journey'; export * from './getting_started.journey'; export * from './add_monitor.journey'; @@ -19,7 +20,6 @@ export * from './global_parameters.journey'; export * from './detail_flyout'; // export * from './alert_rules/default_status_alert.journey'; export * from './test_now_mode.journey'; -// export * from './data_retention.journey'; export * from './monitor_details_page/monitor_summary.journey'; export * from './test_run_details.journey'; export * from './step_details.journey'; diff --git a/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts b/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts index 02779526c84213..e30b9a18872a9f 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts @@ -18,7 +18,12 @@ import { DataStreamConfig, } from './formatters/public_formatters/convert_to_data_stream'; import { sendErrorTelemetryEvents } from '../routes/telemetry/monitor_upgrade_sender'; -import { MonitorFields, PublicLocations, ServiceLocationErrors } from '../../common/runtime_types'; +import { + MonitorFields, + PublicLocations, + ServiceLocation, + ServiceLocationErrors, +} from '../../common/runtime_types'; import { ServiceConfig } from '../../common/config'; const TEST_SERVICE_USERNAME = 'localKibanaIntegrationTestsUser'; @@ -32,6 +37,7 @@ export interface ServiceData { endpoint?: 'monitors' | 'runOnce' | 'sync'; isEdit?: boolean; license: LicenseGetLicenseInformation; + location?: ServiceLocation; } export interface ServicePayload { @@ -161,10 +167,14 @@ export class ServiceAPIClient { } async syncMonitors(data: ServiceData) { - return (await this.callAPI('PUT', { ...data, endpoint: 'sync' })).pushErrors; + try { + return (await this.callAPI('PUT', { ...data, endpoint: 'sync' })).pushErrors; + } catch (e) { + this.logger.error(e); + } } - processServiceData({ monitors, ...restOfData }: ServiceData) { + processServiceData({ monitors, location, ...restOfData }: ServiceData) { // group monitors by location const monitorsByLocation: Array<{ location: { id: string; url: string }; @@ -172,12 +182,14 @@ export class ServiceAPIClient { data: ServicePayload; }> = []; this.locations.forEach(({ id, url }) => { - const locMonitors = monitors.filter(({ locations }) => - locations?.find((loc) => loc.id === id && loc.isServiceManaged) - ); - if (locMonitors.length > 0) { - const data = this.getRequestData({ ...restOfData, monitors: locMonitors }); - monitorsByLocation.push({ location: { id, url }, monitors: locMonitors, data }); + if (!location || location.id === id) { + const locMonitors = monitors.filter(({ locations }) => + locations?.find((loc) => loc.id === id && loc.isServiceManaged) + ); + if (locMonitors.length > 0) { + const data = this.getRequestData({ ...restOfData, monitors: locMonitors }); + monitorsByLocation.push({ location: { id, url }, monitors: locMonitors, data }); + } } }); return monitorsByLocation; @@ -275,7 +287,9 @@ export class ServiceAPIClient { result: AxiosResponse | ServicePayload ) { if ('status' in result || 'request' in result) { - this.logger.debug(result.data); + if (result.data) { + this.logger.debug(result.data); + } this.logger.debug( `Successfully called service location ${url}${result.request?.path} with method ${method} with ${numMonitors} monitors` ); diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts index e98508a5e41165..66c25a8bb5ca19 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts @@ -23,22 +23,24 @@ const taskManagerSetup = taskManagerMock.createSetup(); const mockCoreStart = coreMock.createStart() as CoreStart; -mockCoreStart.elasticsearch.client.asInternalUser.license.get = jest.fn().mockResolvedValue({ - license: { - status: 'active', - uid: 'c5788419-1c6f-424a-9217-da7a0a9151a0', - type: 'platinum', - issue_date: '2022-11-29T00:00:00.000Z', - issue_date_in_millis: 1669680000000, - expiry_date: '2024-12-31T23:59:59.999Z', - expiry_date_in_millis: 1735689599999, - max_nodes: 100, - max_resource_units: null, - issued_to: 'Elastic - INTERNAL (development environments)', - issuer: 'API', - start_date_in_millis: 1669680000000, - }, -}); +const mockLicense = () => { + mockCoreStart.elasticsearch.client.asInternalUser.license.get = jest.fn().mockResolvedValue({ + license: { + status: 'active', + uid: 'c5788419-1c6f-424a-9217-da7a0a9151a0', + type: 'platinum', + issue_date: '2022-11-29T00:00:00.000Z', + issue_date_in_millis: 1669680000000, + expiry_date: '2024-12-31T23:59:59.999Z', + expiry_date_in_millis: 1735689599999, + max_nodes: 100, + max_resource_units: null, + issued_to: 'Elastic - INTERNAL (development environments)', + issuer: 'API', + start_date_in_millis: 1669680000000, + }, + }); +}; const getFakePayload = (locations: HeartbeatConfig['locations']) => { return { @@ -87,6 +89,16 @@ describe('SyntheticsService', () => { savedObjectsClient: savedObjectsClientMock.create()!, } as unknown as SyntheticsServerSetup; + const mockConfig = { + service: { + devUrl: 'http://localhost', + manifestUrl: 'https://test-manifest.com', + }, + enabled: true, + }; + + mockLicense(); + const getMockedService = (locationsNum: number = 1) => { const locations = times(locationsNum).map((n) => { return { @@ -101,13 +113,7 @@ describe('SyntheticsService', () => { status: LocationStatus.GA, }; }); - serverMock.config = { - service: { - devUrl: 'http://localhost', - manifestUrl: 'https://test-manifest.com', - }, - enabled: true, - }; + serverMock.config = mockConfig; if (serverMock.savedObjectsClient) { serverMock.savedObjectsClient.find = jest.fn().mockResolvedValue({ saved_objects: [ @@ -133,8 +139,10 @@ describe('SyntheticsService', () => { const service = new SyntheticsService(serverMock); service.apiClient.locations = locations; + service.locations = locations; jest.spyOn(service, 'getOutput').mockResolvedValue({ hosts: ['es'], api_key: 'i:k' }); + jest.spyOn(service, 'getSyntheticsParams').mockResolvedValue({}); return { service, locations }; }; @@ -222,7 +230,7 @@ describe('SyntheticsService', () => { const { service } = getMockedService(); jest.spyOn(service, 'getOutput').mockRestore(); - serverMock.encryptedSavedObjects = mockEncryptedSO(null) as any; + serverMock.encryptedSavedObjects = mockEncryptedSO(); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -240,8 +248,12 @@ describe('SyntheticsService', () => { jest.spyOn(service, 'getOutput').mockRestore(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: [ + { + attributes: getFakePayload([locations[0]]), + }, + ], + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -309,8 +321,12 @@ describe('SyntheticsService', () => { const { service, locations } = getMockedService(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: [ + { + attributes: getFakePayload([locations[0]]), + }, + ], + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -357,8 +373,10 @@ describe('SyntheticsService', () => { }); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: { + attributes: getFakePayload([locations[0]]), + }, + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -370,13 +388,18 @@ describe('SyntheticsService', () => { describe('getSyntheticsParams', () => { it('returns the params for all spaces', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: { key: 'username', value: 'elastic' }, - namespaces: ['*'], - }) as any; + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['*'], + }, + ], + }); const params = await service.getSyntheticsParams(); @@ -389,6 +412,16 @@ describe('SyntheticsService', () => { it('returns the params for specific space', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); + + serverMock.encryptedSavedObjects = mockEncryptedSO({ + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['*'], + }, + ], + }); const params = await service.getSyntheticsParams({ spaceId: 'default' }); @@ -403,11 +436,16 @@ describe('SyntheticsService', () => { }); it('returns the space limited params', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: { key: 'username', value: 'elastic' }, - namespaces: ['default'], - }) as any; + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['default'], + }, + ], + }); const params = await service.getSyntheticsParams({ spaceId: 'default' }); @@ -418,4 +456,62 @@ describe('SyntheticsService', () => { }); }); }); + + describe('pagination', () => { + const service = new SyntheticsService(serverMock); + + const locations = times(5).map((n) => { + return { + id: `loc-${n}`, + label: `Location ${n}`, + url: `https://example.com/${n}`, + geo: { + lat: 0, + lon: 0, + }, + isServiceManaged: true, + status: LocationStatus.GA, + }; + }); + service.apiClient.locations = locations; + service.locations = locations; + jest.spyOn(service, 'getOutput').mockResolvedValue({ hosts: ['es'], api_key: 'i:k' }); + jest.spyOn(service, 'getSyntheticsParams').mockResolvedValue({}); + + it('paginates the results', async () => { + serverMock.config = mockConfig; + + mockLicense(); + + const syncSpy = jest.spyOn(service.apiClient, 'syncMonitors'); + + let num = -1; + const data = times(10000).map((n) => { + if (num === 4) { + num = -1; + } + num++; + if (locations?.[num + 1]) { + return { + attributes: getFakePayload([locations[num], locations[num + 1]]), + }; + } + return { + attributes: getFakePayload([locations[num]]), + }; + }); + + serverMock.encryptedSavedObjects = mockEncryptedSO({ monitors: data }); + + (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); + + await service.pushConfigs(); + + expect(syncSpy).toHaveBeenCalledTimes(72); + expect(axios).toHaveBeenCalledTimes(72); + expect(logger.debug).toHaveBeenCalledTimes(112); + expect(logger.info).toHaveBeenCalledTimes(0); + expect(logger.error).toHaveBeenCalledTimes(0); + }); + }); }); diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts index da0434201eaa24..4f5739e933e0a1 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts @@ -7,18 +7,16 @@ /* eslint-disable max-classes-per-file */ -import { Logger, SavedObject, ElasticsearchClient } from '@kbn/core/server'; +import { ElasticsearchClient, Logger, SavedObject } from '@kbn/core/server'; import { ConcreteTaskInstance, TaskInstance, TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; -import { concatMap, Subject } from 'rxjs'; -import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; -import pMap from 'p-map'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common'; import { ALL_SPACES_ID } from '@kbn/spaces-plugin/common/constants'; +import pMap from 'p-map'; import { registerCleanUpTask } from './private_location/clean_up_task'; import { SyntheticsServerSetup } from '../types'; import { syntheticsMonitorType, syntheticsParamType } from '../../common/types/saved_objects'; @@ -31,11 +29,9 @@ import { ServiceAPIClient, ServiceData } from './service_api_client'; import { ConfigKey, - EncryptedSyntheticsMonitorAttributes, MonitorFields, ServiceLocationErrors, ServiceLocations, - SyntheticsMonitorWithId, SyntheticsMonitorWithSecretsAttributes, SyntheticsParams, ThrottlingOptions, @@ -279,6 +275,18 @@ export class SyntheticsService { return license; } + private async getSOClientFinder({ pageSize }: { pageSize: number }) { + const encryptedClient = this.server.encryptedSavedObjects.getClient(); + + return await encryptedClient.createPointInTimeFinderDecryptedAsInternalUser( + { + type: syntheticsMonitorType, + perPage: pageSize, + namespaces: [ALL_SPACES_ID], + } + ); + } + private getESClient() { if (!this.server.coreStart) { return; @@ -373,55 +381,95 @@ export class SyntheticsService { async pushConfigs() { const license = await this.getLicense(); const service = this; - const subject = new Subject(); + + const PER_PAGE = 250; + service.syncErrors = []; let output: ServiceData['output'] | null = null; - subject - .pipe( - concatMap(async (monitors) => { - try { - if (monitors.length === 0 || !this.config.manifestUrl) { - return; - } + const paramsBySpace = await this.getSyntheticsParams(); + const finder = await this.getSOClientFinder({ pageSize: PER_PAGE }); - if (!output) { - output = await this.getOutput(); + const bucketsByLocation: Record = {}; + this.locations.forEach((location) => { + bucketsByLocation[location.id] = []; + }); - if (!output) { - sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'API key is not valid.', - message: 'Failed to push configs. API key is not valid.', - type: 'invalidApiKey', - stackVersion: service.server.stackVersion, - }); - return; - } - } + const syncAllLocations = async (perBucket = 0) => { + await pMap( + this.locations, + async (location) => { + if (bucketsByLocation[location.id].length > perBucket && output) { + const locMonitors = bucketsByLocation[location.id].splice(0, PER_PAGE); - this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); + this.logger.debug( + `${locMonitors.length} monitors will be pushed to synthetics service for location ${location.id}.` + ); - service.syncErrors = await this.apiClient.syncMonitors({ - monitors, + const syncErrors = await this.apiClient.syncMonitors({ + monitors: locMonitors, output, license, + location, }); - } catch (e) { + + this.syncErrors = [...(this.syncErrors ?? []), ...(syncErrors ?? [])]; + } + }, + { + stopOnError: false, + } + ); + }; + + for await (const result of finder.find()) { + try { + if (!output) { + output = await this.getOutput(); + if (!output) { sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'Failed to push configs to service', - message: e?.message, - type: 'pushConfigsError', - code: e?.code, - status: e.status, + reason: 'API key is not valid.', + message: 'Failed to push configs. API key is not valid.', + type: 'invalidApiKey', stackVersion: service.server.stackVersion, }); - this.logger.error(e); + return; } - }) - ) - .subscribe(); + } + + const monitors = result.saved_objects.filter(({ error }) => !error); + const formattedConfigs = this.normalizeConfigs(monitors, paramsBySpace); + + this.logger.debug( + `${formattedConfigs.length} monitors will be pushed to synthetics service.` + ); + + formattedConfigs.forEach((monitor) => { + monitor.locations.forEach((location) => { + if (location.isServiceManaged) { + bucketsByLocation[location.id]?.push(monitor); + } + }); + }); + + await syncAllLocations(PER_PAGE); + } catch (e) { + sendErrorTelemetryEvents(service.logger, service.server.telemetry, { + reason: 'Failed to push configs to service', + message: e?.message, + type: 'pushConfigsError', + code: e?.code, + status: e.status, + stackVersion: service.server.stackVersion, + }); + this.logger.error(e); + } + } - await this.getMonitorConfigs(subject); + // execute the remaining monitors + await syncAllLocations(); + + await finder.close(); } async runOnceConfigs(configs?: ConfigData) { @@ -481,123 +529,28 @@ export class SyntheticsService { async deleteAllConfigs() { const license = await this.getLicense(); - const subject = new Subject(); - - subject - .pipe( - concatMap(async (monitors) => { - const hasPublicLocations = monitors.some((config) => - config.locations.some(({ isServiceManaged }) => isServiceManaged) - ); - - if (hasPublicLocations) { - const output = await this.getOutput(); - if (!output) { - return; - } - - const data = { - output, - monitors, - license, - }; - return await this.apiClient.delete(data); - } - }) - ) - .subscribe(); - - await this.getMonitorConfigs(subject); - } - - async getMonitorConfigs(subject: Subject) { - const soClient = this.server.savedObjectsClient; - const encryptedClient = this.server.encryptedSavedObjects.getClient(); - - if (!soClient?.find) { - return [] as SyntheticsMonitorWithId[]; - } - const paramsBySpace = await this.getSyntheticsParams(); - - const finder = soClient.createPointInTimeFinder({ - type: syntheticsMonitorType, - perPage: 100, - namespaces: [ALL_SPACES_ID], - }); + const finder = await this.getSOClientFinder({ pageSize: 100 }); + const output = await this.getOutput(); + if (!output) { + return; + } for await (const result of finder.find()) { - const monitors = await this.decryptMonitors(result.saved_objects, encryptedClient); - - const configDataList: ConfigData[] = (monitors ?? []).map((monitor) => { - const attributes = monitor.attributes as unknown as MonitorFields; - const monitorSpace = monitor.namespaces?.[0] ?? DEFAULT_SPACE_ID; - - const params = paramsBySpace[monitorSpace]; + const monitors = this.normalizeConfigs(result.saved_objects, paramsBySpace); + const hasPublicLocations = monitors.some((config) => + config.locations.some(({ isServiceManaged }) => isServiceManaged) + ); - return { - params: { ...params, ...(paramsBySpace?.[ALL_SPACES_ID] ?? {}) }, - monitor: normalizeSecrets(monitor).attributes, - configId: monitor.id, - heartbeatId: attributes[ConfigKey.MONITOR_QUERY_ID], + if (hasPublicLocations) { + const data = { + output, + monitors, + license, }; - }); - - const formattedConfigs = this.formatConfigs(configDataList); - - subject.next(formattedConfigs as MonitorFields[]); + return await this.apiClient.delete(data); + } } - - await finder.close(); - } - - async decryptMonitors( - monitors: Array>, - encryptedClient: EncryptedSavedObjectsClient - ) { - const start = performance.now(); - - const decryptedMonitors = await pMap( - monitors, - (monitor) => - new Promise((resolve) => { - encryptedClient - .getDecryptedAsInternalUser( - syntheticsMonitorType, - monitor.id, - { - namespace: monitor.namespaces?.[0], - } - ) - .then((decryptedMonitor) => resolve(decryptedMonitor)) - .catch((e) => { - this.logger.error(e); - sendErrorTelemetryEvents(this.logger, this.server.telemetry, { - reason: 'Failed to decrypt monitor', - message: e?.message, - type: 'runTaskError', - code: e?.code, - status: e.status, - stackVersion: this.server.stackVersion, - }); - resolve(null); - }); - }) - ); - - const end = performance.now(); - const duration = end - start; - - this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, { - event: { - duration, - }, - monitors: monitors.length, - }); - - return decryptedMonitors.filter((monitor) => monitor !== null) as Array< - SavedObject - >; } async getSyntheticsParams({ @@ -671,6 +624,27 @@ export class SyntheticsService { ); }); } + + normalizeConfigs( + monitors: Array>, + paramsBySpace: Record> + ) { + const configDataList = (monitors ?? []).map((monitor) => { + const attributes = monitor.attributes as unknown as MonitorFields; + const monitorSpace = monitor.namespaces?.[0] ?? DEFAULT_SPACE_ID; + + const params = paramsBySpace[monitorSpace]; + + return { + params: { ...params, ...(paramsBySpace?.[ALL_SPACES_ID] ?? {}) }, + monitor: normalizeSecrets(monitor).attributes, + configId: monitor.id, + heartbeatId: attributes[ConfigKey.MONITOR_QUERY_ID], + }; + }); + + return this.formatConfigs(configDataList) as MonitorFields[]; + } } class IndexTemplateInstallationError extends Error { diff --git a/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts b/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts index f881d242aeb52e..b9d6b54961f591 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts @@ -6,21 +6,54 @@ */ import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; +import { cloneDeep } from 'lodash'; +import { syntheticsParamType } from '../../../common/types/saved_objects'; -export const mockEncryptedSO = ( - data: any = { attributes: { key: 'username', value: 'elastic' }, namespaces: ['*'] } -) => ({ - getClient: jest.fn().mockReturnValue({ - getDecryptedAsInternalUser: jest.fn().mockResolvedValue(data), - createPointInTimeFinderDecryptedAsInternalUser: jest.fn().mockImplementation(() => ({ - close: jest.fn(), - find: jest.fn().mockReturnValue({ - async *[Symbol.asyncIterator]() { - yield { - saved_objects: data === null ? [] : [data], - }; - }, - }), - })), - } as jest.Mocked), -}); +export const mockEncryptedSO = ({ + monitors = null, + params, +}: { monitors?: any; params?: any } = {}) => { + const result = cloneDeep(monitors); + const mockParams = params ?? [ + { attributes: { key: 'username', value: 'elastic' }, namespaces: ['*'] }, + ]; + return { + isEncryptionError: jest.fn(), + getClient: jest.fn().mockReturnValue({ + getDecryptedAsInternalUser: jest.fn().mockResolvedValue(monitors), + createPointInTimeFinderDecryptedAsInternalUser: jest + .fn() + .mockImplementation(({ perPage, type: soType }) => ({ + close: jest.fn(), + find: jest.fn().mockReturnValue({ + async *[Symbol.asyncIterator]() { + if (soType === syntheticsParamType) { + yield { + saved_objects: mockParams, + }; + return; + } + if (!perPage) { + yield { + saved_objects: result, + }; + return; + } + if (monitors === null) { + return; + } + do { + const currentPage = result.splice(0, perPage); + if (currentPage.length === 0) { + return; + } + yield { + saved_objects: currentPage, + }; + } while (result.length > 0); + }, + }), + })), + } as jest.Mocked), + }; +}; diff --git a/x-pack/test/osquery_cypress/artifact_manager.ts b/x-pack/test/osquery_cypress/artifact_manager.ts index 54b9a70d37affb..0f5ca38d74978c 100644 --- a/x-pack/test/osquery_cypress/artifact_manager.ts +++ b/x-pack/test/osquery_cypress/artifact_manager.ts @@ -6,5 +6,6 @@ */ export async function getLatestVersion(): Promise { - return '8.11.0-SNAPSHOT'; + // temporary solution until newer agents work fine with Docker + return '8.10.4'; } diff --git a/yarn.lock b/yarn.lock index 64fb4dc4095892..ad54652c86130f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -17861,6 +17861,13 @@ got@^9.6.0: to-readable-stream "^1.0.0" url-parse-lax "^3.0.0" +gpt-tokenizer@^2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/gpt-tokenizer/-/gpt-tokenizer-2.1.2.tgz#14f7ce424cf2309fb5be66e112d1836080c2791a" + integrity sha512-HSuI5d6uey+c7x/VzQlPfCoGrfLyAc28vxWofKbjR9PJHm0AjQGSWkKw/OJnb+8S1g7nzgRsf0WH3dK+NNWYbg== + dependencies: + rfc4648 "^1.5.2" + graceful-fs@4.X, graceful-fs@^4.1.11, graceful-fs@^4.1.15, graceful-fs@^4.1.2, graceful-fs@^4.1.6, graceful-fs@^4.1.9, graceful-fs@^4.2.0, graceful-fs@^4.2.10, graceful-fs@^4.2.11, graceful-fs@^4.2.2, graceful-fs@^4.2.4, graceful-fs@^4.2.6, graceful-fs@^4.2.8, graceful-fs@^4.2.9: version "4.2.11" resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.11.tgz#4183e4e8bf08bb6e05bbb2f7d2e0c8f712ca40e3" @@ -26765,6 +26772,11 @@ reusify@^1.0.4: resolved "https://registry.yarnpkg.com/reusify/-/reusify-1.0.4.tgz#90da382b1e126efc02146e90845a88db12925d76" integrity sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw== +rfc4648@^1.5.2: + version "1.5.2" + resolved "https://registry.yarnpkg.com/rfc4648/-/rfc4648-1.5.2.tgz#cf5dac417dd83e7f4debf52e3797a723c1373383" + integrity sha512-tLOizhR6YGovrEBLatX1sdcuhoSCXddw3mqNVAcKxGJ+J0hFeJ+SjeWCv5UPA/WU3YzWPPuCVYgXBKZUPGpKtg== + rfdc@^1.2.0: version "1.3.0" resolved "https://registry.yarnpkg.com/rfdc/-/rfdc-1.3.0.tgz#d0b7c441ab2720d05dc4cf26e01c89631d9da08b"