Skip to content

Commit

Permalink
[data.search] Fix unhandled promise rejections (#181785)
Browse files Browse the repository at this point in the history
## Summary

Resolves #168957.

It turns out the underlying issue was resolved in
#170041 (unhandled errors when
deleting not being handled). However this still left it up to consumers
of `pollSearch` to be 100% sure they weren't leaking unhandled promise
rejections. This adds code directly to `pollSearch` that will handle
rejections if they aren't handled in the calling code. It also adds
tests to consumers of `pollSearch` to make sure they don't barf in the
case that the `cancel` function errors.

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
  • Loading branch information
lukasolson authored May 1, 2024
1 parent 63bc11a commit e4a381a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 7 deletions.
19 changes: 19 additions & 0 deletions src/plugins/data/common/search/poll_search.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ describe('pollSearch', () => {
expect(cancelFn).toBeCalledTimes(1);
});

test('Does not leak unresolved promises on cancel', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn().mockRejectedValueOnce({ error: 'Oh no!' });
const abortController = new AbortController();
const poll = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).toPromise();

await new Promise((resolve) => setTimeout(resolve, 300));
abortController.abort();

await expect(poll).rejects.toThrow(AbortError);

await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});

test("Stops, but doesn't cancel on unsubscribe", async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
Expand Down
9 changes: 7 additions & 2 deletions src/plugins/data/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { isAbortResponse, isRunningResponse } from '..';

export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
cancel?: () => void,
cancel?: () => Promise<void>,
{ pollInterval, abortSignal }: IAsyncSearchOptions = {}
): Observable<Response> => {
const getPollInterval = (elapsedTime: number): number => {
Expand All @@ -41,8 +41,13 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
throw new AbortError();
}

const safeCancel = () =>
cancel?.().catch((e) => {
console.error(e); // eslint-disable-line no-console
});

if (cancel) {
abortSignal?.addEventListener('abort', cancel, { once: true });
abortSignal?.addEventListener('abort', safeCancel, { once: true });
}

const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,50 @@ describe('SearchInterceptor', () => {
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should not leak unresolved promises if DELETE fails', async () => {
mockCoreSetup.http.delete.mockRejectedValueOnce({ status: 404, statusText: 'Not Found' });
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
rawResponse: {},
id: 1,
},
},
{
time: 10,
value: {
statusCode: 500,
message: 'oh no',
id: 1,
},
isError: true,
},
];
mockFetchImplementation(responses);

const response = searchInterceptor.search({}, { pollInterval: 0 });
response.subscribe({ next, error });

await timeTravel(10);

expect(next).toHaveBeenCalled();
expect(error).not.toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();

// Long enough to reach the timeout but not long enough to reach the next response
await timeTravel(10);

expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
expect((error.mock.calls[0][0] as Error).message).toBe('oh no');
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should NOT DELETE a running SAVED async search on abort', async () => {
const sessionId = 'sessionId';
sessionService.isCurrentSession.mockImplementation((_sessionId) => _sessionId === sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,19 @@ export class SearchInterceptor {
isSavedToBackground = true;
});

const sendCancelRequest = once(() => {
this.deps.http.delete(`/internal/search/${strategy}/${id}`, { version: '1' });
});
const sendCancelRequest = once(() =>
this.deps.http.delete(`/internal/search/${strategy}/${id}`, { version: '1' })
);

const cancel = () => id && !isSavedToBackground && sendCancelRequest();
const cancel = async () => {
if (!id || isSavedToBackground) return;
try {
await sendCancelRequest();
} catch (e) {
// eslint-disable-next-line no-console
console.error(e);
}
};

// Async search requires a series of requests
// 1) POST /<index pattern>/_async_search/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ describe('ES search strategy', () => {
const mockSubmitCaller = jest.fn();
const mockDeleteCaller = jest.fn();
const mockLogger: any = {
debug: () => {},
debug: jest.fn(),
error: jest.fn(),
};
const mockDeps = {
uiSettingsClient: {
Expand Down Expand Up @@ -357,6 +358,47 @@ describe('ES search strategy', () => {
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).toBeCalled();
});

it('should not throw when encountering an error deleting', async () => {
mockSubmitCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});

const errResponse = new errors.ResponseError({
body: xContentParseException,
statusCode: 400,
headers: {},
warnings: [],
meta: {} as any,
});
mockDeleteCaller.mockRejectedValueOnce(errResponse);

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: KbnServerError | undefined;
try {
await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockSubmitCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).toBeCalled();
});
});

describe('with sessionId', () => {
Expand Down

0 comments on commit e4a381a

Please sign in to comment.