Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate peer recovery from translog to retention lease #49448

Merged
merged 21 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Allow ops-based recovery without existing retention lease
  • Loading branch information
dnhatn committed Nov 21, 2019
commit b4410f7ed7f6c53a49be2a9fa8d2c02cea0f3dfa
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -1168,6 +1169,12 @@ private void indexRandomDocuments(
}
}

private void indexDocument(String id) throws IOException {
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
}

private int countOfIndexedRandomDocuments() throws IOException {
return Integer.parseInt(loadInfoDocument(index + "_count"));
}
Expand Down Expand Up @@ -1248,4 +1255,37 @@ public void testPeerRecoveryRetentionLeases() throws IOException {
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
}
}

/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some but not too many
* uncommitted documents before we restart the cluster. This is important when we move from translog based to retention leases
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
* based peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.build());
ensureGreen(index);
int numDocs = randomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
final String id = Integer.toString(randomIntBetween(1, 100));
indexDocument(id);
if (rarely()) {
flush(index, randomBoolean());
}
}
flush(index, randomBoolean());
int moreDocs = randomIntBetween(0, 50);
for (int i = 0; i < moreDocs; i++) {
final String id = Integer.toString(randomIntBetween(1, 100));
indexDocument(id);
}
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ public void testClosedIndexNoopRecovery() throws Exception {
switch (CLUSTER_TYPE) {
case OLD: break;
case MIXED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
break;
case UPGRADED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
break;
}
}
Expand Down Expand Up @@ -647,7 +647,7 @@ public void testUpdateDoc() throws Exception {
}
}

private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client()
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));

Expand Down Expand Up @@ -678,4 +678,30 @@ private void assertNoFileBasedRecovery(String indexName, Predicate<String> targe

assertTrue("must find replica", foundReplica);
}

/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there are some but not too many
* uncommitted documents during rolling upgrade. This is important when we move from translog based to retention leases
* based peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
final String index = "test_operation_based_recovery";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(50, 100));
flush(index, randomBoolean());
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 20));
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, nodeName ->
CLUSTER_TYPE == ClusterType.UPGRADED
|| nodeName.startsWith(CLUSTER_NAME + "-0")
|| Booleans.parseBoolean(System.getProperty("tests.first_round")) == false && nodeName.startsWith(CLUSTER_NAME + "-1"));
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 20));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,15 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
&& isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& (useRetentionLeases == false
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
|| retentionLeaseRef.get() == null
|| retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo());
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history.

if (isSequenceNumberBasedRecovery && useRetentionLeases) {
if (isSequenceNumberBasedRecovery && useRetentionLeases && retentionLeaseRef.get() != null) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get());
Expand All @@ -205,7 +206,12 @@ && isTargetSameHistory()
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
sendFileStep.onResponse(SendFileResult.EMPTY);
// We can get here via a full cluster restart from a pre-7.4 version where we did not have PPRLs for existing replicas.
if (useRetentionLeases && retentionLeaseRef.get() == null) {
createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, lease -> SendFileResult.EMPTY));
} else {
sendFileStep.onResponse(SendFileResult.EMPTY);
}
} else {
final Engine.IndexCommitRef safeCommitRef;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,17 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
});
newPrimary.flush(new FlushRequest().force(true));
if (replica.indexSettings().isSoftDeleteEnabled()) {
// We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
// The min_retained_seqno only advances when a merge asks for the retention query.
newPrimary.flush(new FlushRequest().force(true));

// We also need to make sure that there is no retention lease holding on to any history. The lease for the old primary
// We need to make sure that there is no retention lease holding on to any history. The lease for the old primary
// expires since there are no unassigned shards in this replication group).
assertBusy(() -> {
newPrimary.syncRetentionLeases();
//noinspection OptionalGetWithoutIsPresent since there must be at least one lease
assertThat(newPrimary.getRetentionLeases().leases().stream().mapToLong(RetentionLease::retainingSequenceNumber)
.min().getAsLong(), greaterThan(newPrimary.seqNoStats().getMaxSeqNo()));
});
// We also need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
// The min_retained_seqno only advances when a merge asks for the retention query.
newPrimary.flush(new FlushRequest().force(true));
}
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -1203,52 +1202,6 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
}
}

public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);

String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h")
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(0, 100))
.mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(toList()));
ensureGreen(indexName);

final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
final DiscoveryNodes discoveryNodes = clusterService().state().nodes();
final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId);

final IndexShard primary = internalCluster().getInstance(IndicesService.class,
discoveryNodes.get(indexShardRoutingTable.primaryShard().currentNodeId()).getName()).getShardOrNull(shardId);

final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0);
internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(),
new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertFalse(client().admin().cluster().prepareHealth()
.setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1))
.setWaitForEvents(Priority.LANGUID).get().isTimedOut());

final PlainActionFuture<ReplicationResponse> future = new PlainActionFuture<>();
primary.removeRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting), future);
future.get();

return super.onNodeStopped(nodeName);
}
});

ensureGreen(indexName);

//noinspection OptionalGetWithoutIsPresent because it fails the test if absent
final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get()
.shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get();
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
}

public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);

Expand Down Expand Up @@ -1390,7 +1343,9 @@ public Settings onNodeStopped(String nodeName) throws Exception {
assertBusy(() -> assertFalse("should no longer have lease for " + replicaShardRouting,
client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats()
.retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting))));

// Flush to advance SoftDeletesPolicy#minRetainedSeqNo so primary does not have a complete history
// for replicas without peer recovery retention leases.
client().admin().indices().prepareFlush(indexName).setForce(true).get();
return super.onNodeStopped(nodeName);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -1062,4 +1063,38 @@ private static boolean isXPackTemplate(String name) {
return false;
}
}

public void flush(String index, boolean force) throws IOException {
logger.info("flushing index {} force={}", index, force);
final Request flushRequest = new Request("POST", "/" + index + "/_flush");
flushRequest.addParameter("force", Boolean.toString(force));
flushRequest.addParameter("wait_if_ongoing", "true");
assertOK(client().performRequest(flushRequest));
}

public void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client().performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
@SuppressWarnings("unchecked")
List<Map<String, ?>> shards = (List<Map<String, ?>>) XContentMapValues.extractValue(indexName + ".shards", recoveries);
assertNotNull(shards);
boolean foundReplica = false;
logger.info("index {} recovery stats {}", indexName, shards);
for (Map<String, ?> shard : shards) {
if (shard.get("primary") == Boolean.FALSE && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) {
List<?> details = (List<?>) XContentMapValues.extractValue("index.files.details", shard);
// once detailed recoveries works, remove this if.
if (details == null) {
long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue();
long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue();
logger.info("total [{}] reused [{}]", totalFiles, reusedFiles);
assertThat("must reuse all files, recoveries [" + recoveries + "]", totalFiles, equalTo(reusedFiles));
} else {
assertNotNull(details);
assertThat(details, Matchers.empty());
}
foundReplica = true;
}
}
assertTrue("must find replica", foundReplica);
}
}