From e663d7907f220490a6bc8651b811ee6d554507ff Mon Sep 17 00:00:00 2001 From: John Schulz Date: Sat, 22 Aug 2020 07:49:10 -0400 Subject: [PATCH] [Ingest Manager] Don't retain POST /setup results. fixes #74587 (#75372) (#75587) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add retries for registry requests. works, afaict. no tests. one TS issue. * Fix TS issue. Add link to node-fetch error docs * Restore some accidentally deleted code. * Add more comments. Remove logging. * Add tests for plugin setup service & handlers * Add tests for Registry retry logic * Extract setup retry logic to separate function/file * Add tests for setup retry logic ``` firstSuccessOrTryAgain ✓ reject/throws is called again & its value returned (18ms) ✓ the first success value is cached (2ms) ``` * More straightforward(?) tests for setup caching * Revert cached setup. Still limit 1 call at a time Terrible tests. Committing & pushing to see if it fixes failures like https://github.com/elastic/kibana/pull/74507/checks?check_run_id=980178887 https://kibana-ci.elastic.co/job/elastic+kibana+pipeline-pull-request/67892/execution/node/663/log/ ``` 07:36:56 └-> "before all" hook 07:36:56 └-> should not allow to enroll an agent with a invalid enrollment 07:36:56 └-> "before each" hook: global before each 07:36:56 └-> "before each" hook: beforeSetupWithDockerRegistry 07:36:56 │ proc [kibana] error [11:36:56.369] Error: Internal Server Error 07:36:56 │ proc [kibana] at HapiResponseAdapter.toError (/dev/shm/workspace/parallel/5/kibana/build/kibana-build-xpack/src/core/server/http/router/response_adapter.js:132:19) 07:36:56 │ proc [kibana] at HapiResponseAdapter.toHapiResponse (/dev/shm/workspace/parallel/5/kibana/build/kibana-build-xpack/src/core/server/http/router/response_adapter.js:86:19) 07:36:56 │ proc [kibana] at HapiResponseAdapter.handle (/dev/shm/workspace/parallel/5/kibana/build/kibana-build-xpack/src/core/server/http/router/response_adapter.js:81:17) 07:36:56 │ proc [kibana] at Router.handle (/dev/shm/workspace/parallel/5/kibana/build/kibana-build-xpack/src/core/server/http/router/router.js:164:34) 07:36:56 │ proc [kibana] at process._tickCallback (internal/process/next_tick.js:68:7) 07:36:56 │ proc [kibana] log [11:36:56.581] [info][authentication][plugins][security] Authentication attempt failed: [security_exception] missing authentication credentials for REST request [/_security/_authenticate], with { header={ WWW-Authenticate={ 0="ApiKey" & 1="Basic realm=\"security\" charset=\"UTF-8\"" } } } 07:36:56 └- ✓ pass (60ms) "Ingest Manager Endpoints Fleet Endpoints fleet_agents_enroll should not allow to enroll an agent with a invalid enrollment" 07:36:56 └-> should not allow to enroll an agent with a shared id if it already exists 07:36:56 └-> "before each" hook: global before each 07:36:56 └-> "before each" hook: beforeSetupWithDockerRegistry 07:36:56 └- ✓ pass (111ms) "Ingest Manager Endpoints Fleet Endpoints fleet_agents_enroll should not allow to enroll an agent with a shared id if it already exists " 07:36:56 └-> should not allow to enroll an agent with a version > kibana 07:36:56 └-> "before each" hook: global before each 07:36:56 └-> "before each" hook: beforeSetupWithDockerRegistry 07:36:56 └- ✓ pass (58ms) "Ingest Manager Endpoints Fleet Endpoints fleet_agents_enroll should not allow to enroll an agent with a version > kibana" 07:36:56 └-> should allow to enroll an agent with a valid enrollment token 07:36:56 └-> "before each" hook: global before each 07:36:56 └-> "before each" hook: beforeSetupWithDockerRegistry 07:36:56 └- ✖ fail: Ingest Manager Endpoints Fleet Endpoints fleet_agents_enroll should allow to enroll an agent with a valid enrollment token 07:36:56 │ Error: expected 200 "OK", got 500 "Internal Server Error" 07:36:56 │ at Test._assertStatus (/dev/shm/workspace/kibana/node_modules/supertest/lib/test.js:268:12) 07:36:56 │ at Test._assertFunction (/dev/shm/workspace/kibana/node_modules/supertest/lib/test.js:283:11) 07:36:56 │ at Test.assert (/dev/shm/workspace/kibana/node_modules/supertest/lib/test.js:173:18) 07:36:56 │ at assert (/dev/shm/workspace/kibana/node_modules/supertest/lib/test.js:131:12) 07:36:56 │ at /dev/shm/workspace/kibana/node_modules/supertest/lib/test.js:128:5 07:36:56 │ at Test.Request.callback (/dev/shm/workspace/kibana/node_modules/superagent/lib/node/index.js:718:3) 07:36:56 │ at parser (/dev/shm/workspace/kibana/node_modules/superagent/lib/node/index.js:906:18) 07:36:56 │ at IncomingMessage.res.on (/dev/shm/workspace/kibana/node_modules/superagent/lib/node/parsers/json.js:19:7) 07:36:56 │ at endReadableNT (_stream_readable.js:1145:12) 07:36:56 │ at process._tickCallback (internal/process/next_tick.js:63:19) 07:36:56 │ 07:36:56 │ ``` * New name & tests for one-at-a-time /setup behavior `firstPromiseBlocksAndFufills` for "the first promise created blocks others from being created, then fufills all with that first result" * More (better?) renaming * Fix name in test description * Fix spelling typo. * Remove registry retry code & tests * Use async fn's .catch to avoid unhandled rejection Add explicit `isPending` value instead of overloading role of `status`. Could probably do without it, but it makes the intent more clear. Co-authored-by: Elastic Machine # Conflicts: # x-pack/plugins/ingest_manager/server/services/setup.ts Co-authored-by: Elastic Machine --- .../server/routes/setup/handlers.test.ts | 83 +++++++++ .../server/routes/setup/index.ts | 40 +++-- .../server/services/setup.test.ts | 76 +++++--- .../ingest_manager/server/services/setup.ts | 167 ++++++++---------- .../server/services/setup_utils.test.ts | 149 ++++++++++++++++ .../server/services/setup_utils.ts | 37 ++++ 6 files changed, 418 insertions(+), 134 deletions(-) create mode 100644 x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts create mode 100644 x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts create mode 100644 x-pack/plugins/ingest_manager/server/services/setup_utils.ts diff --git a/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts b/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts new file mode 100644 index 00000000000000..ce826e78c454da --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/routes/setup/handlers.test.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { xpackMocks } from '../../../../../../x-pack/mocks'; +import { httpServerMock } from 'src/core/server/mocks'; +import { PostIngestSetupResponse } from '../../../common'; +import { RegistryError } from '../../errors'; +import { createAppContextStartContractMock } from '../../mocks'; +import { ingestManagerSetupHandler } from './handlers'; +import { appContextService } from '../../services/app_context'; +import { setupIngestManager } from '../../services/setup'; + +jest.mock('../../services/setup', () => { + return { + setupIngestManager: jest.fn(), + }; +}); + +const mockSetupIngestManager = setupIngestManager as jest.MockedFunction; + +describe('ingestManagerSetupHandler', () => { + let context: ReturnType; + let response: ReturnType; + let request: ReturnType; + + beforeEach(async () => { + context = xpackMocks.createRequestHandlerContext(); + response = httpServerMock.createResponseFactory(); + request = httpServerMock.createKibanaRequest({ + method: 'post', + path: '/api/ingest_manager/setup', + }); + // prevents `Logger not set.` and other appContext errors + appContextService.start(createAppContextStartContractMock()); + }); + + afterEach(async () => { + jest.clearAllMocks(); + appContextService.stop(); + }); + + it('POST /setup succeeds w/200 and body of resolved value', async () => { + mockSetupIngestManager.mockImplementation(() => Promise.resolve({ isIntialized: true })); + await ingestManagerSetupHandler(context, request, response); + + const expectedBody: PostIngestSetupResponse = { isInitialized: true }; + expect(response.customError).toHaveBeenCalledTimes(0); + expect(response.ok).toHaveBeenCalledWith({ body: expectedBody }); + }); + + it('POST /setup fails w/500 on custom error', async () => { + mockSetupIngestManager.mockImplementation(() => + Promise.reject(new Error('SO method mocked to throw')) + ); + await ingestManagerSetupHandler(context, request, response); + + expect(response.customError).toHaveBeenCalledTimes(1); + expect(response.customError).toHaveBeenCalledWith({ + statusCode: 500, + body: { + message: 'SO method mocked to throw', + }, + }); + }); + + it('POST /setup fails w/502 on RegistryError', async () => { + mockSetupIngestManager.mockImplementation(() => + Promise.reject(new RegistryError('Registry method mocked to throw')) + ); + + await ingestManagerSetupHandler(context, request, response); + expect(response.customError).toHaveBeenCalledTimes(1); + expect(response.customError).toHaveBeenCalledWith({ + statusCode: 502, + body: { + message: 'Registry method mocked to throw', + }, + }); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/routes/setup/index.ts b/x-pack/plugins/ingest_manager/server/routes/setup/index.ts index 1d1e7a2d721c99..fe51abec45b238 100644 --- a/x-pack/plugins/ingest_manager/server/routes/setup/index.ts +++ b/x-pack/plugins/ingest_manager/server/routes/setup/index.ts @@ -14,8 +14,7 @@ import { } from './handlers'; import { PostFleetSetupRequestSchema } from '../../types'; -export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) => { - // Ingest manager setup +export const registerIngestManagerSetupRoute = (router: IRouter) => { router.post( { path: SETUP_API_ROUTE, @@ -26,12 +25,20 @@ export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) }, ingestManagerSetupHandler ); +}; - if (!config.fleet.enabled) { - return; - } +export const registerCreateFleetSetupRoute = (router: IRouter) => { + router.post( + { + path: FLEET_SETUP_API_ROUTES.CREATE_PATTERN, + validate: PostFleetSetupRequestSchema, + options: { tags: [`access:${PLUGIN_ID}-all`] }, + }, + createFleetSetupHandler + ); +}; - // Get Fleet setup +export const registerGetFleetStatusRoute = (router: IRouter) => { router.get( { path: FLEET_SETUP_API_ROUTES.INFO_PATTERN, @@ -40,14 +47,19 @@ export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) }, getFleetStatusHandler ); +}; + +export const registerRoutes = (router: IRouter, config: IngestManagerConfigType) => { + // Ingest manager setup + registerIngestManagerSetupRoute(router); + + if (!config.fleet.enabled) { + return; + } + + // Get Fleet setup + registerGetFleetStatusRoute(router); // Create Fleet setup - router.post( - { - path: FLEET_SETUP_API_ROUTES.CREATE_PATTERN, - validate: PostFleetSetupRequestSchema, - options: { tags: [`access:${PLUGIN_ID}-all`] }, - }, - createFleetSetupHandler - ); + registerCreateFleetSetupRoute(router); }; diff --git a/x-pack/plugins/ingest_manager/server/services/setup.test.ts b/x-pack/plugins/ingest_manager/server/services/setup.test.ts index 474b2fde23c81d..bb01862aaf3174 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.test.ts @@ -4,41 +4,59 @@ * you may not use this file except in compliance with the Elastic License. */ +import { xpackMocks } from '../../../../../x-pack/mocks'; +import { createAppContextStartContractMock } from '../mocks'; +import { appContextService } from './app_context'; import { setupIngestManager } from './setup'; -import { savedObjectsClientMock } from 'src/core/server/mocks'; -describe('setupIngestManager', () => { - it('returned promise should reject if errors thrown', async () => { - const { savedObjectsClient, callClusterMock } = makeErrorMocks(); - const setupPromise = setupIngestManager(savedObjectsClient, callClusterMock); - await expect(setupPromise).rejects.toThrow('mocked'); +const mockedMethodThrowsError = () => + jest.fn().mockImplementation(() => { + throw new Error('SO method mocked to throw'); }); -}); -function makeErrorMocks() { - jest.mock('./app_context'); // else fails w/"Logger not set." - jest.mock('./epm/registry/registry_url', () => { - return { - fetchUrl: () => { - throw new Error('mocked registry#fetchUrl'); - }, - }; +class CustomTestError extends Error {} +const mockedMethodThrowsCustom = () => + jest.fn().mockImplementation(() => { + throw new CustomTestError('method mocked to throw'); }); - const callClusterMock = jest.fn(); - const savedObjectsClient = savedObjectsClientMock.create(); - savedObjectsClient.find = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#find'); - }); - savedObjectsClient.get = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#get'); +describe('setupIngestManager', () => { + let context: ReturnType; + + beforeEach(async () => { + context = xpackMocks.createRequestHandlerContext(); + // prevents `Logger not set.` and other appContext errors + appContextService.start(createAppContextStartContractMock()); }); - savedObjectsClient.update = jest.fn().mockImplementation(() => { - throw new Error('mocked SO#update'); + + afterEach(async () => { + jest.clearAllMocks(); + appContextService.stop(); }); - return { - savedObjectsClient, - callClusterMock, - }; -} + describe('should reject with any error thrown underneath', () => { + it('SO client throws plain Error', async () => { + const soClient = context.core.savedObjects.client; + soClient.create = mockedMethodThrowsError(); + soClient.find = mockedMethodThrowsError(); + soClient.get = mockedMethodThrowsError(); + soClient.update = mockedMethodThrowsError(); + + const setupPromise = setupIngestManager(soClient, jest.fn()); + await expect(setupPromise).rejects.toThrow('SO method mocked to throw'); + await expect(setupPromise).rejects.toThrow(Error); + }); + + it('SO client throws other error', async () => { + const soClient = context.core.savedObjects.client; + soClient.create = mockedMethodThrowsCustom(); + soClient.find = mockedMethodThrowsCustom(); + soClient.get = mockedMethodThrowsCustom(); + soClient.update = mockedMethodThrowsCustom(); + + const setupPromise = setupIngestManager(soClient, jest.fn()); + await expect(setupPromise).rejects.toThrow('method mocked to throw'); + await expect(setupPromise).rejects.toThrow(CustomTestError); + }); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index 4ef093d38879ab..0ef2def1c8104f 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -26,112 +26,97 @@ import { packageConfigService } from './package_config'; import { generateEnrollmentAPIKey } from './api_keys'; import { settingsService } from '.'; import { appContextService } from './app_context'; +import { awaitIfPending } from './setup_utils'; const FLEET_ENROLL_USERNAME = 'fleet_enroll'; const FLEET_ENROLL_ROLE = 'fleet_enroll'; -// the promise which tracks the setup -let setupIngestStatus: Promise | undefined; -// default resolve & reject to guard against "undefined is not a function" errors -let onSetupResolve = () => {}; -let onSetupReject = (error: Error) => {}; +export interface SetupStatus { + isIntialized: true | undefined; +} export async function setupIngestManager( soClient: SavedObjectsClientContract, callCluster: CallESAsCurrentUser -) { - // installation in progress - if (setupIngestStatus) { - await setupIngestStatus; - } else { - // create the initial promise - setupIngestStatus = new Promise((res, rej) => { - onSetupResolve = res; - onSetupReject = rej; - }); - } - try { - const [installedPackages, defaultOutput, config] = await Promise.all([ - // packages installed by default - ensureInstalledDefaultPackages(soClient, callCluster), - outputService.ensureDefaultOutput(soClient), - agentConfigService.ensureDefaultAgentConfig(soClient), - ensureDefaultIndices(callCluster), - settingsService.getSettings(soClient).catch((e: any) => { - if (e.isBoom && e.output.statusCode === 404) { - const http = appContextService.getHttpSetup(); - const serverInfo = http.getServerInfo(); - const basePath = http.basePath; - - const cloud = appContextService.getCloud(); - const cloudId = cloud?.isCloudEnabled && cloud.cloudId; - const cloudUrl = cloudId && decodeCloudId(cloudId)?.kibanaUrl; - const flagsUrl = appContextService.getConfig()?.fleet?.kibana?.host; - const defaultUrl = url.format({ - protocol: serverInfo.protocol, - hostname: serverInfo.hostname, - port: serverInfo.port, - pathname: basePath.serverBasePath, - }); - - return settingsService.saveSettings(soClient, { - agent_auto_upgrade: true, - package_auto_upgrade: true, - kibana_url: cloudUrl || flagsUrl || defaultUrl, - }); - } - - return Promise.reject(e); - }), - ]); - - // ensure default packages are added to the default conifg - const configWithPackageConfigs = await agentConfigService.get(soClient, config.id, true); - if (!configWithPackageConfigs) { - throw new Error('Config not found'); - } - if ( - configWithPackageConfigs.package_configs.length && - typeof configWithPackageConfigs.package_configs[0] === 'string' - ) { - throw new Error('Config not found'); - } - for (const installedPackage of installedPackages) { - const packageShouldBeInstalled = DEFAULT_AGENT_CONFIGS_PACKAGES.some( - (packageName) => installedPackage.name === packageName - ); - if (!packageShouldBeInstalled) { - continue; +): Promise { + return awaitIfPending(async () => createSetupSideEffects(soClient, callCluster)); +} + +async function createSetupSideEffects( + soClient: SavedObjectsClientContract, + callCluster: CallESAsCurrentUser +): Promise { + const [installedPackages, defaultOutput, config] = await Promise.all([ + // packages installed by default + ensureInstalledDefaultPackages(soClient, callCluster), + outputService.ensureDefaultOutput(soClient), + agentConfigService.ensureDefaultAgentConfig(soClient), + ensureDefaultIndices(callCluster), + settingsService.getSettings(soClient).catch((e: any) => { + if (e.isBoom && e.output.statusCode === 404) { + const http = appContextService.getHttpSetup(); + const serverInfo = http.getServerInfo(); + const basePath = http.basePath; + + const cloud = appContextService.getCloud(); + const cloudId = cloud?.isCloudEnabled && cloud.cloudId; + const cloudUrl = cloudId && decodeCloudId(cloudId)?.kibanaUrl; + const flagsUrl = appContextService.getConfig()?.fleet?.kibana?.host; + const defaultUrl = url.format({ + protocol: serverInfo.protocol, + hostname: serverInfo.hostname, + port: serverInfo.port, + pathname: basePath.serverBasePath, + }); + + return settingsService.saveSettings(soClient, { + agent_auto_upgrade: true, + package_auto_upgrade: true, + kibana_url: cloudUrl || flagsUrl || defaultUrl, + }); } - const isInstalled = configWithPackageConfigs.package_configs.some( - (d: PackageConfig | string) => { - return typeof d !== 'string' && d.package?.name === installedPackage.name; - } - ); + return Promise.reject(e); + }), + ]); - if (!isInstalled) { - await addPackageToConfig( - soClient, - callCluster, - installedPackage, - configWithPackageConfigs, - defaultOutput - ); - } + // ensure default packages are added to the default conifg + const configWithPackageConfigs = await agentConfigService.get(soClient, config.id, true); + if (!configWithPackageConfigs) { + throw new Error('Config not found'); + } + if ( + configWithPackageConfigs.package_configs.length && + typeof configWithPackageConfigs.package_configs[0] === 'string' + ) { + throw new Error('Config not found'); + } + for (const installedPackage of installedPackages) { + const packageShouldBeInstalled = DEFAULT_AGENT_CONFIGS_PACKAGES.some( + (packageName) => installedPackage.name === packageName + ); + if (!packageShouldBeInstalled) { + continue; } - // if everything works, resolve/succeed - onSetupResolve(); - } catch (error) { - // if anything errors, reject/fail - onSetupReject(error); + const isInstalled = configWithPackageConfigs.package_configs.some( + (d: PackageConfig | string) => { + return typeof d !== 'string' && d.package?.name === installedPackage.name; + } + ); + + if (!isInstalled) { + await addPackageToConfig( + soClient, + callCluster, + installedPackage, + configWithPackageConfigs, + defaultOutput + ); + } } - // be sure to return the promise because it has the resolved/rejected status attached to it - // otherwise, we effectively return success every time even if there are errors - // because `return undefined` -> `Promise.resolve(undefined)` in an `async` function - return setupIngestStatus; + return { isIntialized: true }; } export async function setupFleet( diff --git a/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts new file mode 100644 index 00000000000000..8d71fc48a21291 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/setup_utils.test.ts @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { awaitIfPending } from './setup_utils'; + +async function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe('awaitIfPending', () => { + it('first promise called blocks others', async () => { + const fnA = jest.fn().mockImplementation(async () => {}); + const fnB = jest.fn().mockImplementation(async () => {}); + const fnC = jest.fn().mockImplementation(async () => {}); + const fnD = jest.fn().mockImplementation(async () => {}); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + await Promise.all(promises); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + }); + + describe('first promise created, not necessarily first fulfilled, sets value for all in queue', () => { + it('succeeds', async () => { + const fnA = jest.fn().mockImplementation(async () => { + await sleep(1000); + return 'called first'; + }); + const fnB = jest.fn().mockImplementation(async () => 'called second'); + const fnC = jest.fn().mockImplementation(async () => 'called third'); + const fnD = jest.fn().mockImplementation(async () => 'called fourth'); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + await expect(Promise.all(promises)).resolves.toEqual([ + 'called first', + 'called first', + 'called first', + 'called first', + ]); + }); + + it('throws', async () => { + const expectedError = new Error('error is called first'); + const fnA = jest.fn().mockImplementation(async () => { + await sleep(1000); + throw expectedError; + }); + const fnB = jest.fn().mockImplementation(async () => 'called second'); + const fnC = jest.fn().mockImplementation(async () => 'called third'); + const fnD = jest.fn().mockImplementation(async () => 'called fourth'); + const promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + + await expect(Promise.all(promises)).rejects.toThrow(expectedError); + await expect(Promise.allSettled(promises)).resolves.toEqual([ + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + { status: 'rejected', reason: expectedError }, + ]); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + }); + }); + + it('does not block other calls after batch is fulfilled. can call again for a new result', async () => { + const fnA = jest + .fn() + .mockImplementationOnce(async () => 'fnA first') + .mockImplementationOnce(async () => 'fnA second') + .mockImplementation(async () => 'fnA default/2+'); + const fnB = jest.fn().mockImplementation(async () => {}); + const fnC = jest.fn().mockImplementation(async () => {}); + const fnD = jest.fn().mockImplementation(async () => {}); + let promises = [ + awaitIfPending(fnA), + awaitIfPending(fnB), + awaitIfPending(fnC), + awaitIfPending(fnD), + ]; + let results = await Promise.all(promises); + + expect(fnA).toHaveBeenCalledTimes(1); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual(['fnA first', 'fnA first', 'fnA first', 'fnA first']); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(2); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual(['fnA second', 'fnA second', 'fnA second', 'fnA second']); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(3); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual([ + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + ]); + + promises = [awaitIfPending(fnA), awaitIfPending(fnB), awaitIfPending(fnC), awaitIfPending(fnD)]; + results = await Promise.all(promises); + expect(fnA).toHaveBeenCalledTimes(4); + expect(fnB).toHaveBeenCalledTimes(0); + expect(fnC).toHaveBeenCalledTimes(0); + expect(fnD).toHaveBeenCalledTimes(0); + expect(results).toEqual([ + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + 'fnA default/2+', + ]); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/setup_utils.ts b/x-pack/plugins/ingest_manager/server/services/setup_utils.ts new file mode 100644 index 00000000000000..3c752bd410c5ae --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/setup_utils.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// the promise which tracks the setup +let status: Promise | undefined; +let isPending = false; +// default resolve to guard against "undefined is not a function" errors +let onResolve = (value?: unknown) => {}; +let onReject = (reason: any) => {}; + +export async function awaitIfPending(asyncFunction: Function): Promise { + // pending successful or failed attempt + if (isPending) { + // don't run concurrent installs + // return a promise which will eventually resolve/reject + return status; + } else { + // create the initial promise + status = new Promise((res, rej) => { + isPending = true; + onResolve = res; + onReject = rej; + }); + } + try { + const result = await asyncFunction().catch(onReject); + onResolve(result); + } catch (error) { + // if something fails + onReject(error); + } + isPending = false; + return status; +}