From e4a381a5ba9427c30b0e6e398a909a610ac1c90a Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Wed, 1 May 2024 19:55:55 +0200 Subject: [PATCH] [data.search] Fix unhandled promise rejections (#181785) ## Summary Resolves https://github.com/elastic/kibana/issues/168957. It turns out the underlying issue was resolved in https://github.com/elastic/kibana/pull/170041 (unhandled errors when deleting not being handled). However this still left it up to consumers of `pollSearch` to be 100% sure they weren't leaking unhandled promise rejections. This adds code directly to `pollSearch` that will handle rejections if they aren't handled in the calling code. It also adds tests to consumers of `pollSearch` to make sure they don't barf in the case that the `cancel` function errors. ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios --- .../data/common/search/poll_search.test.ts | 19 ++++++++ src/plugins/data/common/search/poll_search.ts | 9 +++- .../search_interceptor.test.ts | 44 +++++++++++++++++++ .../search_interceptor/search_interceptor.ts | 16 +++++-- .../ese_search/ese_search_strategy.test.ts | 44 ++++++++++++++++++- 5 files changed, 125 insertions(+), 7 deletions(-) diff --git a/src/plugins/data/common/search/poll_search.test.ts b/src/plugins/data/common/search/poll_search.test.ts index a884f1dad15580..6cc8fa34a2bb31 100644 --- a/src/plugins/data/common/search/poll_search.test.ts +++ b/src/plugins/data/common/search/poll_search.test.ts @@ -85,6 +85,25 @@ describe('pollSearch', () => { expect(cancelFn).toBeCalledTimes(1); }); + test('Does not leak unresolved promises on cancel', async () => { + const searchFn = getMockedSearch$(20); + const cancelFn = jest.fn().mockRejectedValueOnce({ error: 'Oh no!' }); + const abortController = new AbortController(); + const poll = pollSearch(searchFn, cancelFn, { + abortSignal: abortController.signal, + }).toPromise(); + + await new Promise((resolve) => setTimeout(resolve, 300)); + abortController.abort(); + + await expect(poll).rejects.toThrow(AbortError); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(1); + }); + test("Stops, but doesn't cancel on unsubscribe", async () => { const searchFn = getMockedSearch$(20); const cancelFn = jest.fn(); diff --git a/src/plugins/data/common/search/poll_search.ts b/src/plugins/data/common/search/poll_search.ts index 54985366b91274..3965ee96bf4174 100644 --- a/src/plugins/data/common/search/poll_search.ts +++ b/src/plugins/data/common/search/poll_search.ts @@ -14,7 +14,7 @@ import { isAbortResponse, isRunningResponse } from '..'; export const pollSearch = ( search: () => Promise, - cancel?: () => void, + cancel?: () => Promise, { pollInterval, abortSignal }: IAsyncSearchOptions = {} ): Observable => { const getPollInterval = (elapsedTime: number): number => { @@ -41,8 +41,13 @@ export const pollSearch = ( throw new AbortError(); } + const safeCancel = () => + cancel?.().catch((e) => { + console.error(e); // eslint-disable-line no-console + }); + if (cancel) { - abortSignal?.addEventListener('abort', cancel, { once: true }); + abortSignal?.addEventListener('abort', safeCancel, { once: true }); } const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe( diff --git a/src/plugins/data/public/search/search_interceptor/search_interceptor.test.ts b/src/plugins/data/public/search/search_interceptor/search_interceptor.test.ts index 8c01ea615fd37f..c6d07bc9e98c2e 100644 --- a/src/plugins/data/public/search/search_interceptor/search_interceptor.test.ts +++ b/src/plugins/data/public/search/search_interceptor/search_interceptor.test.ts @@ -500,6 +500,50 @@ describe('SearchInterceptor', () => { expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1); }); + test('should not leak unresolved promises if DELETE fails', async () => { + mockCoreSetup.http.delete.mockRejectedValueOnce({ status: 404, statusText: 'Not Found' }); + const responses = [ + { + time: 10, + value: { + isPartial: true, + isRunning: true, + rawResponse: {}, + id: 1, + }, + }, + { + time: 10, + value: { + statusCode: 500, + message: 'oh no', + id: 1, + }, + isError: true, + }, + ]; + mockFetchImplementation(responses); + + const response = searchInterceptor.search({}, { pollInterval: 0 }); + response.subscribe({ next, error }); + + await timeTravel(10); + + expect(next).toHaveBeenCalled(); + expect(error).not.toHaveBeenCalled(); + expect(fetchMock).toHaveBeenCalled(); + expect(mockCoreSetup.http.delete).not.toHaveBeenCalled(); + + // Long enough to reach the timeout but not long enough to reach the next response + await timeTravel(10); + + expect(error).toHaveBeenCalled(); + expect(error.mock.calls[0][0]).toBeInstanceOf(Error); + expect((error.mock.calls[0][0] as Error).message).toBe('oh no'); + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1); + }); + test('should NOT DELETE a running SAVED async search on abort', async () => { const sessionId = 'sessionId'; sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId); diff --git a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts index 5da9d4c2f71200..4b73f2ac9336bd 100644 --- a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts @@ -339,11 +339,19 @@ export class SearchInterceptor { isSavedToBackground = true; }); - const sendCancelRequest = once(() => { - this.deps.http.delete(`/internal/search/${strategy}/${id}`, { version: '1' }); - }); + const sendCancelRequest = once(() => + this.deps.http.delete(`/internal/search/${strategy}/${id}`, { version: '1' }) + ); - const cancel = () => id && !isSavedToBackground && sendCancelRequest(); + const cancel = async () => { + if (!id || isSavedToBackground) return; + try { + await sendCancelRequest(); + } catch (e) { + // eslint-disable-next-line no-console + console.error(e); + } + }; // Async search requires a series of requests // 1) POST //_async_search/ diff --git a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts index b50ccc508dbfdf..f5e655612bc9c5 100644 --- a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts @@ -66,7 +66,8 @@ describe('ES search strategy', () => { const mockSubmitCaller = jest.fn(); const mockDeleteCaller = jest.fn(); const mockLogger: any = { - debug: () => {}, + debug: jest.fn(), + error: jest.fn(), }; const mockDeps = { uiSettingsClient: { @@ -357,6 +358,47 @@ describe('ES search strategy', () => { expect(err).not.toBeUndefined(); expect(mockDeleteCaller).toBeCalled(); }); + + it('should not throw when encountering an error deleting', async () => { + mockSubmitCaller.mockResolvedValueOnce({ + ...mockAsyncResponse, + body: { + ...mockAsyncResponse.body, + is_running: true, + }, + }); + + const errResponse = new errors.ResponseError({ + body: xContentParseException, + statusCode: 400, + headers: {}, + warnings: [], + meta: {} as any, + }); + mockDeleteCaller.mockRejectedValueOnce(errResponse); + + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockLegacyConfig$, + mockSearchConfig, + mockLogger + ); + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + // Abort after an incomplete first response is returned + setTimeout(() => abortController.abort(), 100); + + let err: KbnServerError | undefined; + try { + await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise(); + } catch (e) { + err = e; + } + expect(mockSubmitCaller).toBeCalled(); + expect(err).not.toBeUndefined(); + expect(mockDeleteCaller).toBeCalled(); + }); }); describe('with sessionId', () => {