Skip to content

Commit

Permalink
Remove Snapshot INIT Step (#55918) (#59374)
Browse files Browse the repository at this point in the history
With #55773 the snapshot INIT state step has become obsolete. We can set up the snapshot directly in one single step to simplify the state machine.

This is a big help for building concurrent snapshots because it allows us to establish a deterministic order of operations between snapshot create and delete operations since all of their entries now contain a repository generation. With this change simple queuing up of snapshot operations can and will be added in a follow-up.
  • Loading branch information
original-brownbear committed Jul 13, 2020
1 parent 8ab0c1b commit 08b54fe
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -46,10 +45,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -77,84 +72,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testDisruptionOnSnapshotInitialization() throws Exception {
final String idxName = "test";
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);

createRandomIndex(idxName);

logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

// Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization
// by the current master and the former master. It is not causing any issues in real life scenario, but
// might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures.
logger.info("--> initializing the repository");
assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state());

final String masterNode1 = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>();
otherNodes.addAll(allMasterEligibleNodes);
otherNodes.remove(masterNode1);
otherNodes.add(dataNode);

NetworkDisruption networkDisruption =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
NetworkDisruption.UNRESPONSIVE);
internalCluster().setDisruptionScheme(networkDisruption);

ClusterService clusterService = internalCluster().clusterService(masterNode1);
CountDownLatch disruptionStarted = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().size() > 0) {
if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) {
// The snapshot started, we can start disruption so the INIT state will arrive to another master node
logger.info("--> starting disruption");
networkDisruption.startDisrupting();
clusterService.removeListener(this);
disruptionStarted.countDown();
}
}
}
});

logger.info("--> starting snapshot");
ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
.prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute();

logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));

awaitNoMoreRunningOperations(dataNode);

logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
try {
assertSnapshotExists("test-repo", "test-snap-2");
} catch (SnapshotMissingException exception) {
logger.info("--> done verifying, snapshot doesn't exist");
}
}, 1, TimeUnit.MINUTES);

logger.info("--> stopping disrupting");
networkDisruption.stopDisrupting();
ensureStableCluster(4, masterNode1);
logger.info("--> done");

future.get();
awaitNoMoreRunningOperations(masterNode1);
}

public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
internalCluster().startMasterOnlyNodes(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () ->
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repoName), null)))),
snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null)))),
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));

logger.info("--> verify that snapshot with missing root level metadata can be deleted");
Expand All @@ -293,7 +293,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () ->
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repoName), null)))),
snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null)))),
is(Version.CURRENT));
final RepositoryData finalRepositoryData = getRepositoryData(repoName);
for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2246,8 +2246,8 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception {
.put("block_on_data", true));


String dataStream = "test-ds";
DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", org.elasticsearch.common.collect.List.of(dataStream));
String dataStream = "datastream";
DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", Collections.singletonList(dataStream));

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
Expand All @@ -2272,7 +2272,7 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception {
client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{dataStream})).actionGet();
fail("Expected deleting index to fail during snapshot");
} catch (SnapshotInProgressException e) {
assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: [test-ds"));
assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: ["+dataStream));
} finally {
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
l -> blobStoreRepository.cleanup(
repositoryStateId,
snapshotsService.minCompatibleVersion(
newState.nodes().getMinNodeVersion(), repositoryName, repositoryData, null),
newState.nodes().getMinNodeVersion(), repositoryData, null),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,18 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
if (state.nodes().getMinNodeVersion().before(SnapshotsService.NO_REPO_INITIALIZE_VERSION)) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshotLegacy(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
}
Loading

0 comments on commit 08b54fe

Please sign in to comment.