diff --git a/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts b/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts new file mode 100644 index 00000000000000..ce826e78c454da --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { xpackMocks } from '../../../../../../x-pack/mocks'; +import { httpServerMock } from 'src/core/server/mocks'; +import { PostIngestSetupResponse } from '../../../common'; +import { RegistryError } from '../../errors'; +import { createAppContextStartContractMock } from '../../mocks'; +import { ingestManagerSetupHandler } from './handlers'; +import { appContextService } from '../../services/app_context'; +import { setupIngestManager } from '../../services/setup'; + +jest.mock('../../services/setup', () => { + return { + setupIngestManager: jest.fn(), + }; +}); + +const mockSetupIngestManager = setupIngestManager as jest.MockedFunction; + +describe('ingestManagerSetupHandler', () => { + let context: ReturnType; + let response: ReturnType; + let request: ReturnType; + + beforeEach(async () => { + context = xpackMocks.createRequestHandlerContext(); + response = httpServerMock.createResponseFactory(); + request = httpServerMock.createKibanaRequest({ + method: 'post', + path: '/api/ingest_manager/setup', + }); + // prevents `Logger not set.` and other appContext errors + appContextService.start(createAppContextStartContractMock()); + }); + + afterEach(async () => { + jest.clearAllMocks(); + appContextService.stop(); + }); + + it('POST /setup succeeds w/200 and body of resolved value', async () => { + mockSetupIngestManager.mockImplementation(() => Promise.resolve({ isIntialized: true })); + await ingestManagerSetupHandler(context, request, response); + + const expectedBody: PostIngestSetupResponse = { isInitialized: true }; + expect(response.customError).toHaveBeenCalledTimes(0); + expect(response.ok).toHaveBeenCalledWith({ body: expectedBody }); + }); + + it('POST /setup fails w/500 on custom error', async () => { + mockSetupIngestManager.mockImplementation(() => + Promise.reject(new Error('SO method mocked to throw')) + ); + await ingestManagerSetupHandler(context, request, response); + + expect(response.customError).toHaveBeenCalledTimes(1); + expect(response.customError).toHaveBeenCalledWith({ + statusCode: 500, + body: { + message: 'SO method mocked to throw', + }, + }); + }); + + it('POST /setup fails w/502 on RegistryError', async () => { + mockSetupIngestManager.mockImplementation(() => + Promise.reject(new RegistryError('Registry method mocked to throw')) + ); + + await ingestManagerSetupHandler(context, request, response); + expect(response.customError).toHaveBeenCalledTimes(1); + expect(response.customError).toHaveBeenCalledWith({ + statusCode: 502, + body: { + message: 'Registry method mocked to throw', + }, + }); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/routes/setup/index.ts b/x-pack/plugins/ingest_manager/server/routes/setup/index.ts index 1d1e7a2d721c99..fe51abec45b238 100644 --- a/x-pack/plugins/ingest_manager/server/routes/setup/index.ts +++ b/x-pack/plugins/ingest_manager/server/routes/setup/index.ts @@ -14,8 +14,7 @@ import { } from './handlers'; import { PostFleetSetupRequestSchema } from '../../types'; -export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) => { - // Ingest manager setup +export const registerIngestManagerSetupRoute = (router: IRouter) => { router.post( { path: SETUP_API_ROUTE, @@ -26,12 +25,20 @@ export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) }, ingestManagerSetupHandler ); +}; - if (!config.fleet.enabled) { - return; - } +export const registerCreateFleetSetupRoute = (router: IRouter) => { + router.post( + { + path: FLEET_SETUP_API_ROUTES.CREATE_PATTERN, + validate: PostFleetSetupRequestSchema, + options: { tags: [`access:${PLUGIN_ID}-all`] }, + }, + createFleetSetupHandler + ); +}; - // Get Fleet setup +export const registerGetFleetStatusRoute = (router: IRouter) => { router.get( { path: FLEET_SETUP_API_ROUTES.INFO_PATTERN, @@ -40,14 +47,19 @@ export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) }, getFleetStatusHandler ); +}; + +export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) => { + // Ingest manager setup + registerIngestManagerSetupRoute(router); + + if (!config.fleet.enabled) { + return; + } + + // Get Fleet setup + registerGetFleetStatusRoute(router); // Create Fleet setup - router.post( - { - path: FLEET_SETUP_API_ROUTES.CREATE_PATTERN, - validate: PostFleetSetupRequestSchema, - options: { tags: [`access:${PLUGIN_ID}-all`] }, - }, - createFleetSetupHandler - ); + registerCreateFleetSetupRoute(router); }; diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts index 97d6f7b40a5886..3801303cf726f9 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts @@ -11,7 +11,8 @@ export function bufferToStream(buffer: Buffer): PassThrough { return stream; } -export function streamToString(stream: NodeJS.ReadableStream): Promise { +export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise { + if (stream instanceof Buffer) return Promise.resolve(stream.toString()); return new Promise((resolve, reject) => { const body: string[] = []; stream.on('data', (chunk: string) => body.push(chunk)); diff --git a/x-pack/plugins/ingest_manager/server/services/setup.test.ts b/x-pack/plugins/ingest_manager/server/services/setup.test.ts index 474b2fde23c81d..bb01862aaf3174 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.test.ts @@ -4,41 +4,59 @@ * you may not use this file except in compliance with the Elastic License. */ +import { xpackMocks } from '../../../../../x-pack/mocks'; +import { createAppContextStartContractMock } from '../mocks'; +import { appContextService } from './app_context'; import { setupIngestManager } from './setup'; -import { savedObjectsClientMock } from 'src/core/server/mocks'; -describe('setupIngestManager', () => { - it('returned promise should reject if errors thrown', async () => { - const { savedObjectsClient, callClusterMock } = makeErrorMocks(); - const setupPromise = setupIngestManager(savedObjectsClient, callClusterMock); - await expect(setupPromise).rejects.toThrow('mocked'); +const mockedMethodThrowsError = () => + jest.fn().mockImplementation(() => { + throw new Error('SO method mocked to throw'); }); -}); -function makeErrorMocks() { - jest.mock('./app_context'); // else fails w/"Logger not set." - jest.mock('./epm/registry/registry_url', () => { - return { - fetchUrl: () => { - throw new Error('mocked registry#fetchUrl'); - }, - }; +class CustomTestError extends Error {} +const mockedMethodThrowsCustom = () => + jest.fn().mockImplementation(() => { + throw new CustomTestError('method mocked to throw'); }); - const callClusterMock = jest.fn(); - const savedObjectsClient = savedObjectsClientMock.create(); - savedObjectsClient.find = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#find'); - }); - savedObjectsClient.get = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#get'); +describe('setupIngestManager', () => { + let context: ReturnType; + + beforeEach(async () => { + context = xpackMocks.createRequestHandlerContext(); + // prevents `Logger not set.` and other appContext errors + appContextService.start(createAppContextStartContractMock()); }); - savedObjectsClient.update = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#update'); + + afterEach(async () => { + jest.clearAllMocks(); + appContextService.stop(); }); - return { - savedObjectsClient, - callClusterMock, - }; -} + describe('should reject with any error thrown underneath', () => { + it('SO client throws plain Error', async () => { + const soClient = context.core.savedObjects.client; + soClient.create = mockedMethodThrowsError(); + soClient.find = mockedMethodThrowsError(); + soClient.get = mockedMethodThrowsError(); + soClient.update = mockedMethodThrowsError(); + + const setupPromise = setupIngestManager(soClient, jest.fn()); + await expect(setupPromise).rejects.toThrow('SO method mocked to throw'); + await expect(setupPromise).rejects.toThrow(Error); + }); + + it('SO client throws other error', async () => { + const soClient = context.core.savedObjects.client; + soClient.create = mockedMethodThrowsCustom(); + soClient.find = mockedMethodThrowsCustom(); + soClient.get = mockedMethodThrowsCustom(); + soClient.update = mockedMethodThrowsCustom(); + + const setupPromise = setupIngestManager(soClient, jest.fn()); + await expect(setupPromise).rejects.toThrow('method mocked to throw'); + await expect(setupPromise).rejects.toThrow(CustomTestError); + }); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index 727b49cebc6080..fb4430f8cf727d 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -26,116 +26,101 @@ import { packagePolicyService } from './package_policy'; import { generateEnrollmentAPIKey } from './api_keys'; import { settingsService } from '.'; import { appContextService } from './app_context'; +import { awaitIfPending } from './setup_utils'; const FLEET_ENROLL_USERNAME = 'fleet_enroll'; const FLEET_ENROLL_ROLE = 'fleet_enroll'; -// the promise which tracks the setup -let setupIngestStatus: Promise | undefined; -// default resolve & reject to guard against "undefined is not a function" errors -let onSetupResolve = () => {}; -let onSetupReject = (error: Error) => {}; +export interface SetupStatus { + isIntialized: true | undefined; +} export async function setupIngestManager( soClient: SavedObjectsClientContract, callCluster: CallESAsCurrentUser -) { - // installation in progress - if (setupIngestStatus) { - await setupIngestStatus; - } else { - // create the initial promise - setupIngestStatus = new Promise((res, rej) => { - onSetupResolve = res; - onSetupReject = rej; - }); +): Promise { + return awaitIfPending(async () => createSetupSideEffects(soClient, callCluster)); +} + +async function createSetupSideEffects( + soClient: SavedObjectsClientContract, + callCluster: CallESAsCurrentUser +): Promise { + const [installedPackages, defaultOutput, defaultAgentPolicy] = await Promise.all([ + // packages installed by default + ensureInstalledDefaultPackages(soClient, callCluster), + outputService.ensureDefaultOutput(soClient), + agentPolicyService.ensureDefaultAgentPolicy(soClient), + ensureDefaultIndices(callCluster), + settingsService.getSettings(soClient).catch((e: any) => { + if (e.isBoom && e.output.statusCode === 404) { + const http = appContextService.getHttpSetup(); + const serverInfo = http.getServerInfo(); + const basePath = http.basePath; + + const cloud = appContextService.getCloud(); + const cloudId = cloud?.isCloudEnabled && cloud.cloudId; + const cloudUrl = cloudId && decodeCloudId(cloudId)?.kibanaUrl; + const flagsUrl = appContextService.getConfig()?.fleet?.kibana?.host; + const defaultUrl = url.format({ + protocol: serverInfo.protocol, + hostname: serverInfo.hostname, + port: serverInfo.port, + pathname: basePath.serverBasePath, + }); + + return settingsService.saveSettings(soClient, { + agent_auto_upgrade: true, + package_auto_upgrade: true, + kibana_url: cloudUrl || flagsUrl || defaultUrl, + }); + } + + return Promise.reject(e); + }), + ]); + + // ensure default packages are added to the default conifg + const agentPolicyWithPackagePolicies = await agentPolicyService.get( + soClient, + defaultAgentPolicy.id, + true + ); + if (!agentPolicyWithPackagePolicies) { + throw new Error('Policy not found'); + } + if ( + agentPolicyWithPackagePolicies.package_policies.length && + typeof agentPolicyWithPackagePolicies.package_policies[0] === 'string' + ) { + throw new Error('Policy not found'); } - try { - const [installedPackages, defaultOutput, defaultAgentPolicy] = await Promise.all([ - // packages installed by default - ensureInstalledDefaultPackages(soClient, callCluster), - outputService.ensureDefaultOutput(soClient), - agentPolicyService.ensureDefaultAgentPolicy(soClient), - ensureDefaultIndices(callCluster), - settingsService.getSettings(soClient).catch((e: any) => { - if (e.isBoom && e.output.statusCode === 404) { - const http = appContextService.getHttpSetup(); - const serverInfo = http.getServerInfo(); - const basePath = http.basePath; - - const cloud = appContextService.getCloud(); - const cloudId = cloud?.isCloudEnabled && cloud.cloudId; - const cloudUrl = cloudId && decodeCloudId(cloudId)?.kibanaUrl; - const flagsUrl = appContextService.getConfig()?.fleet?.kibana?.host; - const defaultUrl = url.format({ - protocol: serverInfo.protocol, - hostname: serverInfo.hostname, - port: serverInfo.port, - pathname: basePath.serverBasePath, - }); - - return settingsService.saveSettings(soClient, { - agent_auto_upgrade: true, - package_auto_upgrade: true, - kibana_url: cloudUrl || flagsUrl || defaultUrl, - }); - } - - return Promise.reject(e); - }), - ]); - - // ensure default packages are added to the default conifg - const agentPolicyWithPackagePolicies = await agentPolicyService.get( - soClient, - defaultAgentPolicy.id, - true + for (const installedPackage of installedPackages) { + const packageShouldBeInstalled = DEFAULT_AGENT_POLICIES_PACKAGES.some( + (packageName) => installedPackage.name === packageName ); - if (!agentPolicyWithPackagePolicies) { - throw new Error('Policy not found'); - } - if ( - agentPolicyWithPackagePolicies.package_policies.length && - typeof agentPolicyWithPackagePolicies.package_policies[0] === 'string' - ) { - throw new Error('Policy not found'); + if (!packageShouldBeInstalled) { + continue; } - for (const installedPackage of installedPackages) { - const packageShouldBeInstalled = DEFAULT_AGENT_POLICIES_PACKAGES.some( - (packageName) => installedPackage.name === packageName - ); - if (!packageShouldBeInstalled) { - continue; + + const isInstalled = agentPolicyWithPackagePolicies.package_policies.some( + (d: PackagePolicy | string) => { + return typeof d !== 'string' && d.package?.name === installedPackage.name; } + ); - const isInstalled = agentPolicyWithPackagePolicies.package_policies.some( - (d: PackagePolicy | string) => { - return typeof d !== 'string' && d.package?.name === installedPackage.name; - } + if (!isInstalled) { + await addPackageToAgentPolicy( + soClient, + callCluster, + installedPackage, + agentPolicyWithPackagePolicies, + defaultOutput ); - - if (!isInstalled) { - await addPackageToAgentPolicy( - soClient, - callCluster, - installedPackage, - agentPolicyWithPackagePolicies, - defaultOutput - ); - } } - - // if everything works, resolve/succeed - onSetupResolve(); - } catch (error) { - // if anything errors, reject/fail - onSetupReject(error); } - // be sure to return the promise because it has the resolved/rejected status attached to it - // otherwise, we effectively return success every time even if there are errors - // because `return undefined` -> `Promise.resolve(undefined)` in an `async` function - return setupIngestStatus; + return { isIntialized: true }; } export async function setupFleet( diff --git a/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts new file mode 100644 index 00000000000000..8d71fc48a21291 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { awaitIfPending } from './setup_utils'; + +async function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe('awaitIfPending', () => { + it('first promise called blocks others', async () => { + const fnA = jest.fn().mockImplementation(async () => {}); + const fnB = jest.fn().mockImplementation(async () => {}); + const fnC = jest.fn().mockImplementation(async () => {}); + const fnD = jest.fn().mockImplementation(async () => {}); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + await Promise.all(promises); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + }); + + describe('first promise created, not necessarily first fulfilled, sets value for all in queue', () => { + it('succeeds', async () => { + const fnA = jest.fn().mockImplementation(async () => { + await sleep(1000); + return 'called first'; + }); + const fnB = jest.fn().mockImplementation(async () => 'called second'); + const fnC = jest.fn().mockImplementation(async () => 'called third'); + const fnD = jest.fn().mockImplementation(async () => 'called fourth'); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + await expect(Promise.all(promises)).resolves.toEqual([ + 'called first', + 'called first', + 'called first', + 'called first', + ]); + }); + + it('throws', async () => { + const expectedError = new Error('error is called first'); + const fnA = jest.fn().mockImplementation(async () => { + await sleep(1000); + throw expectedError; + }); + const fnB = jest.fn().mockImplementation(async () => 'called second'); + const fnC = jest.fn().mockImplementation(async () => 'called third'); + const fnD = jest.fn().mockImplementation(async () => 'called fourth'); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + + await expect(Promise.all(promises)).rejects.toThrow(expectedError); + await expect(Promise.allSettled(promises)).resolves.toEqual([ + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + ]); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + }); + }); + + it('does not block other calls after batch is fulfilled. can call again for a new result', async () => { + const fnA = jest + .fn() + .mockImplementationOnce(async () => 'fnA first') + .mockImplementationOnce(async () => 'fnA second') + .mockImplementation(async () => 'fnA default/2+'); + const fnB = jest.fn().mockImplementation(async () => {}); + const fnC = jest.fn().mockImplementation(async () => {}); + const fnD = jest.fn().mockImplementation(async () => {}); + let promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + let results = await Promise.all(promises); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual(['fnA first', 'fnA first', 'fnA first', 'fnA first']); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(2); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual(['fnA second', 'fnA second', 'fnA second', 'fnA second']); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(3); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual([ + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + ]); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(4); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual([ + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + ]); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/setup_utils.ts b/x-pack/plugins/ingest_manager/server/services/setup_utils.ts new file mode 100644 index 00000000000000..3c752bd410c5ae --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/setup_utils.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// the promise which tracks the setup +let status: Promise | undefined; +let isPending = false; +// default resolve to guard against "undefined is not a function" errors +let onResolve = (value?: unknown) => {}; +let onReject = (reason: any) => {}; + +export async function awaitIfPending(asyncFunction: Function): Promise { + // pending successful or failed attempt + if (isPending) { + // don't run concurrent installs + // return a promise which will eventually resolve/reject + return status; + } else { + // create the initial promise + status = new Promise((res, rej) => { + isPending = true; + onResolve = res; + onReject = rej; + }); + } + try { + const result = await asyncFunction().catch(onReject); + onResolve(result); + } catch (error) { + // if something fails + onReject(error); + } + isPending = false; + return status; +}