Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[esArchiver/deleteIndex] wait and retry if snapshot in progress #18624

Merged
2 changes: 1 addition & 1 deletion src/es_archiver/actions/load.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export async function loadAction({ name, skipExisting, client, dataDir, log }) {

await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting }),
createCreateIndexStream({ client, stats, skipExisting, log }),
createIndexDocRecordsStream(client, stats),
]);

Expand Down
2 changes: 1 addition & 1 deletion src/es_archiver/actions/unload.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export async function unloadAction({ name, client, dataDir, log }) {
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createDeleteIndexStream(client, stats)
createDeleteIndexStream(client, stats, log)
]);
}

Expand Down
7 changes: 4 additions & 3 deletions src/es_archiver/lib/indices/create_index_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { Transform } from 'stream';

import { get } from 'lodash';

export function createCreateIndexStream({ client, stats, skipExisting }) {
import { deleteIndex } from './delete_index';

export function createCreateIndexStream({ client, stats, skipExisting, log }) {
const skipDocsFromIndices = new Set();

async function handleDoc(stream, record) {
Expand Down Expand Up @@ -35,8 +37,7 @@ export function createCreateIndexStream({ client, stats, skipExisting }) {
return;
}

await client.indices.delete({ index });
stats.deletedIndex(index);
await deleteIndex({ client, stats, index, log });
await attemptToCreate(attemptNumber + 1);
return;
}
Expand Down
92 changes: 92 additions & 0 deletions src/es_archiver/lib/indices/delete_index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { get } from 'lodash';

// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
const PENDING_SNAPSHOT_STATUSES = [
'INIT',
'STARTED',
'WAITING',
];

export async function deleteIndex(options) {
const {
client,
stats,
index,
log,
retryIfSnapshottingCount = 3
} = options;

try {
await client.indices.delete({ index });
stats.deletedIndex(index);
} catch (error) {

if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
stats.waitingForInProgressSnapshot(index);
await waitForSnapshotCompletion(client, index, log);
return await deleteIndex({
...options,
retryIfSnapshottingCount: retryIfSnapshottingCount - 1
});
}

if (get(error, 'body.error.type') !== 'index_not_found_exception') {
throw error;
}
}
}

/**
* Determine if an error is complaining about a delete while
* a snapshot is in progress
* @param {Error} error
* @return {Boolean}
*/
export function isDeleteWhileSnapshotInProgressError(error) {
return get(error, 'body.error.reason', '')
.startsWith('Cannot delete indices that are being snapshotted');
}

/**
* Wait for the any snapshot in any respository that is
* snapshotting this index to complete.
*
* @param {EsClient} client
* @param {string} index the name of the index to look for
* @return {Promise<undefined>}
*/
export async function waitForSnapshotCompletion(client, index, log) {
const isSnapshotPending = async (repository, snapshot) => {
const { snapshots: [status] } = await client.snapshot.status({
repository,
snapshot,
});

log.debug(`Snapshot ${repository}/${snapshot} is ${status.state}`);
return PENDING_SNAPSHOT_STATUSES.includes(status.state);
};

const getInProgressSnapshots = async (repository) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
repository,
snapshot: '_current'
});
return inProgressSnapshots;
};

for (const repository of Object.keys(await client.snapshot.getRepository())) {
const allInProgress = await getInProgressSnapshots(repository);
const found = allInProgress.find(s => s.indices.includes(index));

if (!found) {
continue;
}

while (await isSnapshotPending(repository, found.snapshot)) {
// wait a bit before getting status again
await new Promise(resolve => setTimeout(resolve, 500));
}

return;
}
}
18 changes: 3 additions & 15 deletions src/es_archiver/lib/indices/delete_index_stream.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
import { Transform } from 'stream';

import { get } from 'lodash';

export function createDeleteIndexStream(client, stats) {

async function deleteIndex(index) {
try {
await client.indices.delete({ index });
stats.deletedIndex(index);
} catch (err) {
if (get(err, 'body.error.type') !== 'index_not_found_exception') {
throw err;
}
}
}
import { deleteIndex } from './delete_index';

export function createDeleteIndexStream(client, stats, log) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(record, enc, callback) {
try {
if (!record || record.type === 'index') {
await deleteIndex(record.value.index);
await deleteIndex({ client, stats, log, index: record.value.index });
} else {
this.push(record);
}
Expand Down
6 changes: 6 additions & 0 deletions src/es_archiver/lib/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export function createStats(name, log) {
deleted: false,
created: false,
archived: false,
waitForSnapshot: 0,
configDocs: {
upgraded: 0,
tagged: 0,
Expand All @@ -32,6 +33,11 @@ export function createStats(name, log) {
info('Skipped restore for existing index %j', index);
}

waitingForInProgressSnapshot(index) {
getOrCreate(index).waitForSnapshot += 1;
info('Waiting for snapshot of %j to complete', index);
}

deletedIndex(index) {
getOrCreate(index).deleted = true;
info('Deleted existing index %j', index);
Expand Down