diff --git a/src/es_archiver/lib/indices/delete_index_stream.js b/src/es_archiver/lib/indices/delete_index_stream.js index 3f49562fd0b4983..65a0da24cb55a3c 100644 --- a/src/es_archiver/lib/indices/delete_index_stream.js +++ b/src/es_archiver/lib/indices/delete_index_stream.js @@ -1,27 +1,40 @@ import { Transform } from 'stream'; import { get } from 'lodash'; +import { isDeleteWhileSnapshotInProgressError, waitForSnapshotCompletion } from './in_progress_snapshot'; -export function createDeleteIndexStream(client, stats) { +async function deleteIndex(client, stats, index) { + let retryIfSnapshotInProgress = true; - async function deleteIndex(index) { + async function attempt() { try { await client.indices.delete({ index }); stats.deletedIndex(index); - } catch (err) { - if (get(err, 'body.error.type') !== 'index_not_found_exception') { - throw err; + } catch (error) { + if (retryIfSnapshotInProgress && isDeleteWhileSnapshotInProgressError(error)) { + retryIfSnapshotInProgress = false; + await waitForSnapshotCompletion(client, stats, index); + await attempt(); + return; + } + + if (get(error, 'body.error.type') !== 'index_not_found_exception') { + throw error; } } } + await attempt({ waitIfSnapshotInProgress: true }); +} + +export function createDeleteIndexStream(client, stats) { return new Transform({ readableObjectMode: true, writableObjectMode: true, async transform(record, enc, callback) { try { if (!record || record.type === 'index') { - await deleteIndex(record.value.index); + await deleteIndex(client, stats, record.value.index); } else { this.push(record); } diff --git a/src/es_archiver/lib/indices/in_progress_snapshot.js b/src/es_archiver/lib/indices/in_progress_snapshot.js new file mode 100644 index 000000000000000..46c9ea68d4eef3c --- /dev/null +++ b/src/es_archiver/lib/indices/in_progress_snapshot.js @@ -0,0 +1,64 @@ +import { get } from 'lodash'; + +// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319 +const TERMINAL_SNAPSHOT_STATUSES = [ + 'SUCCESS', + 'FAILED', + 'ABORTED' +]; + +/** + * Determine if an error is complaining about a delete while + * a snapshot is in progress + * @param {Error} error + * @return {Boolean} + */ +export function isDeleteWhileSnapshotInProgressError(error) { + return get(error, 'body.error.reason', '') + .startsWith('Cannot delete indices that are being snapshotted'); +} + +/** + * Wait for the any snapshot in any respository that is + * snapshotting this index to complete. + * + * @param {EsClient} client + * @param {EsArchiverStats} stats + * @param {string} index the name of the index to look for + * @return {Promise} + */ +export async function waitForSnapshotCompletion(client, stats, index) { + const isSnapshotInProgress = async (repository, snapshot) => { + const { snapshots: [status] } = await client.snapshot.status({ + repository, + snapshot, + }); + + return !TERMINAL_SNAPSHOT_STATUSES.includes(status.state); + }; + + const getInProgressSnapshots = async (repository) => { + const { snapshots: inProgressSnapshots } = await client.snapshot.get({ + repository, + snapshot: '_current' + }); + return inProgressSnapshots; + }; + + for (const repository of Object.keys(await client.snapshot.getRepository())) { + const allInProgress = await getInProgressSnapshots(repository); + const found = allInProgress.find(s => s.indices.includes(index)); + + if (!found) { + continue; + } + + stats.waitingForInProgressSnapshot(index); + while (await isSnapshotInProgress(repository, found.snapshot)) { + // wait a bit before getting status again + await new Promise(resolve => setTimeout(resolve, 500)); + } + + return; + } +} diff --git a/src/es_archiver/lib/stats.js b/src/es_archiver/lib/stats.js index efe85833ac95bc2..b4c3d918b8d64a0 100644 --- a/src/es_archiver/lib/stats.js +++ b/src/es_archiver/lib/stats.js @@ -12,6 +12,7 @@ export function createStats(name, log) { deleted: false, created: false, archived: false, + waitForSnapshot: 0, configDocs: { upgraded: 0, tagged: 0, @@ -32,6 +33,11 @@ export function createStats(name, log) { info('Skipped restore for existing index %j', index); } + waitingForInProgressSnapshot(index) { + getOrCreate(index).waitForSnapshot += 1; + info('Waiting for snapshot of %j to complete'); + } + deletedIndex(index) { getOrCreate(index).deleted = true; info('Deleted existing index %j', index);