Skip to content

Commit

Permalink
[esArchiver/deleteIndex] wait and retry if snapshot in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
spalger committed Apr 27, 2018
1 parent fdc24f5 commit 12554e3
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
25 changes: 19 additions & 6 deletions src/es_archiver/lib/indices/delete_index_stream.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
import { Transform } from 'stream';

import { get } from 'lodash';
import { isDeleteWhileSnapshotInProgressError, waitForSnapshotCompletion } from './in_progress_snapshot';

export function createDeleteIndexStream(client, stats) {
async function deleteIndex(client, stats, index) {
let retryIfSnapshotInProgress = true;

async function deleteIndex(index) {
async function attempt() {
try {
await client.indices.delete({ index });
stats.deletedIndex(index);
} catch (err) {
if (get(err, 'body.error.type') !== 'index_not_found_exception') {
throw err;
} catch (error) {
if (retryIfSnapshotInProgress && isDeleteWhileSnapshotInProgressError(error)) {
retryIfSnapshotInProgress = false;
await waitForSnapshotCompletion(client, stats, index);
await attempt();
return;
}

if (get(error, 'body.error.type') !== 'index_not_found_exception') {
throw error;
}
}
}

await attempt({ waitIfSnapshotInProgress: true });
}

export function createDeleteIndexStream(client, stats) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(record, enc, callback) {
try {
if (!record || record.type === 'index') {
await deleteIndex(record.value.index);
await deleteIndex(client, stats, record.value.index);
} else {
this.push(record);
}
Expand Down
64 changes: 64 additions & 0 deletions src/es_archiver/lib/indices/in_progress_snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { get } from 'lodash';

// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
const TERMINAL_SNAPSHOT_STATUSES = [
'SUCCESS',
'FAILED',
'ABORTED'
];

/**
* Determine if an error is complaining about a delete while
* a snapshot is in progress
* @param {Error} error
* @return {Boolean}
*/
export function isDeleteWhileSnapshotInProgressError(error) {
return get(error, 'body.error.reason', '')
.startsWith('Cannot delete indices that are being snapshotted');
}

/**
* Wait for the any snapshot in any respository that is
* snapshotting this index to complete.
*
* @param {EsClient} client
* @param {EsArchiverStats} stats
* @param {string} index the name of the index to look for
* @return {Promise<undefined>}
*/
export async function waitForSnapshotCompletion(client, stats, index) {
const isSnapshotInProgress = async (repository, snapshot) => {
const { snapshots: [status] } = await client.snapshot.status({
repository,
snapshot,
});

return !TERMINAL_SNAPSHOT_STATUSES.includes(status.state);
};

const getInProgressSnapshots = async (repository) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
repository,
snapshot: '_current'
});
return inProgressSnapshots;
};

for (const repository of Object.keys(await client.snapshot.getRepository())) {
const allInProgress = await getInProgressSnapshots(repository);
const found = allInProgress.find(s => s.indices.includes(index));

if (!found) {
continue;
}

stats.waitingForInProgressSnapshot(index);
while (await isSnapshotInProgress(repository, found.snapshot)) {
// wait a bit before getting status again
await new Promise(resolve => setTimeout(resolve, 500));
}

return;
}
}
6 changes: 6 additions & 0 deletions src/es_archiver/lib/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export function createStats(name, log) {
deleted: false,
created: false,
archived: false,
waitForSnapshot: 0,
configDocs: {
upgraded: 0,
tagged: 0,
Expand All @@ -32,6 +33,11 @@ export function createStats(name, log) {
info('Skipped restore for existing index %j', index);
}

waitingForInProgressSnapshot(index) {
getOrCreate(index).waitForSnapshot += 1;
info('Waiting for snapshot of %j to complete');
}

deletedIndex(index) {
getOrCreate(index).deleted = true;
info('Deleted existing index %j', index);
Expand Down

0 comments on commit 12554e3

Please sign in to comment.