Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
[esArchiver/deleteIndex] wait and retry if snapshot in progress (elas…
Browse files Browse the repository at this point in the history
…tic#18624)

* [esArchiver/deleteIndex] wait and retry if snapshot in progress

* [esArchiver/deleteIndex] use recursion for retry

* [esArchiver/waitForSnapshot] invert status check

* [esArchiver] share delete-with-retry with create stream

* [esArchiver/stats] include index name in message

* [esArchiver/indexDelete] wait for snapshot completion up to three times

* [esArchiver] log status of snapshot during checks
  • Loading branch information
spalger committed May 8, 2018
1 parent e5d08a3 commit 2c01b9d
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/es_archiver/actions/load.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export async function loadAction({ name, skipExisting, client, dataDir, log }) {

await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting }),
createCreateIndexStream({ client, stats, skipExisting, log }),
createIndexDocRecordsStream(client, stats),
]);

Expand Down
2 changes: 1 addition & 1 deletion src/es_archiver/actions/unload.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export async function unloadAction({ name, client, dataDir, log }) {
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createDeleteIndexStream(client, stats)
createDeleteIndexStream(client, stats, log)
]);
}

Expand Down
7 changes: 4 additions & 3 deletions src/es_archiver/lib/indices/create_index_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { Transform } from 'stream';

import { get } from 'lodash';

export function createCreateIndexStream({ client, stats, skipExisting }) {
import { deleteIndex } from './delete_index';

export function createCreateIndexStream({ client, stats, skipExisting, log }) {
const skipDocsFromIndices = new Set();

async function handleDoc(stream, record) {
Expand Down Expand Up @@ -35,8 +37,7 @@ export function createCreateIndexStream({ client, stats, skipExisting }) {
return;
}

await client.indices.delete({ index });
stats.deletedIndex(index);
await deleteIndex({ client, stats, index, log });
await attemptToCreate(attemptNumber + 1);
return;
}
Expand Down
92 changes: 92 additions & 0 deletions src/es_archiver/lib/indices/delete_index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { get } from 'lodash';

// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
const PENDING_SNAPSHOT_STATUSES = [
'INIT',
'STARTED',
'WAITING',
];

export async function deleteIndex(options) {
const {
client,
stats,
index,
log,
retryIfSnapshottingCount = 3
} = options;

try {
await client.indices.delete({ index });
stats.deletedIndex(index);
} catch (error) {

if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
stats.waitingForInProgressSnapshot(index);
await waitForSnapshotCompletion(client, index, log);
return await deleteIndex({
...options,
retryIfSnapshottingCount: retryIfSnapshottingCount - 1
});
}

if (get(error, 'body.error.type') !== 'index_not_found_exception') {
throw error;
}
}
}

/**
* Determine if an error is complaining about a delete while
* a snapshot is in progress
* @param {Error} error
* @return {Boolean}
*/
export function isDeleteWhileSnapshotInProgressError(error) {
return get(error, 'body.error.reason', '')
.startsWith('Cannot delete indices that are being snapshotted');
}

/**
* Wait for the any snapshot in any respository that is
* snapshotting this index to complete.
*
* @param {EsClient} client
* @param {string} index the name of the index to look for
* @return {Promise<undefined>}
*/
export async function waitForSnapshotCompletion(client, index, log) {
const isSnapshotPending = async (repository, snapshot) => {
const { snapshots: [status] } = await client.snapshot.status({
repository,
snapshot,
});

log.debug(`Snapshot ${repository}/${snapshot} is ${status.state}`);
return PENDING_SNAPSHOT_STATUSES.includes(status.state);
};

const getInProgressSnapshots = async (repository) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
repository,
snapshot: '_current'
});
return inProgressSnapshots;
};

for (const repository of Object.keys(await client.snapshot.getRepository())) {
const allInProgress = await getInProgressSnapshots(repository);
const found = allInProgress.find(s => s.indices.includes(index));

if (!found) {
continue;
}

while (await isSnapshotPending(repository, found.snapshot)) {
// wait a bit before getting status again
await new Promise(resolve => setTimeout(resolve, 500));
}

return;
}
}
18 changes: 3 additions & 15 deletions src/es_archiver/lib/indices/delete_index_stream.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
import { Transform } from 'stream';

import { get } from 'lodash';

export function createDeleteIndexStream(client, stats) {

async function deleteIndex(index) {
try {
await client.indices.delete({ index });
stats.deletedIndex(index);
} catch (err) {
if (get(err, 'body.error.type') !== 'index_not_found_exception') {
throw err;
}
}
}
import { deleteIndex } from './delete_index';

export function createDeleteIndexStream(client, stats, log) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(record, enc, callback) {
try {
if (!record || record.type === 'index') {
await deleteIndex(record.value.index);
await deleteIndex({ client, stats, log, index: record.value.index });
} else {
this.push(record);
}
Expand Down
6 changes: 6 additions & 0 deletions src/es_archiver/lib/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export function createStats(name, log) {
deleted: false,
created: false,
archived: false,
waitForSnapshot: 0,
configDocs: {
upgraded: 0,
tagged: 0,
Expand All @@ -32,6 +33,11 @@ export function createStats(name, log) {
info('Skipped restore for existing index %j', index);
}

waitingForInProgressSnapshot(index) {
getOrCreate(index).waitForSnapshot += 1;
info('Waiting for snapshot of %j to complete', index);
}

deletedIndex(index) {
getOrCreate(index).deleted = true;
info('Deleted existing index %j', index);
Expand Down

0 comments on commit 2c01b9d

Please sign in to comment.