diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 34ce4307316b7..8d15dce6b26f0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -160,7 +160,7 @@ public List getNamedWriteables() { ShardFollowTask::new), // Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME, ShardFollowNodeTask.Status::new) ); } @@ -172,9 +172,10 @@ public List getNamedXContent() { ShardFollowTask::fromXContent), // Task statuses - new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), - ShardFollowNodeTask.Status::fromXContent) - ); + new NamedXContentRegistry.Entry( + ShardFollowNodeTask.Status.class, + new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), + ShardFollowNodeTask.Status::fromXContent)); } /** diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 1b6934ade0a8f..a20d55d93bc77 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -153,6 +153,17 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); } + + @Override + public String toString() { + return "Request{" + + "fromSeqNo=" + fromSeqNo + + ", maxOperationCount=" + maxOperationCount + + ", shardId=" + shardId + + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + + '}'; + } + } public static final class Response extends ActionResponse { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index cfc8e0fc4e766..f2b5b7b3772d2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -30,20 +30,25 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; import java.util.PriorityQueue; import java.util.Queue; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * The node task that fetch the write operations from a leader shard and @@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfFailedBulkOperations = 0; private long numberOfOperationsIndexed = 0; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); + private final LinkedHashMap fetchExceptions; ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { @@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.relativeTimeProvider = relativeTimeProvider; this.retryTimeout = params.getRetryTimeout(); this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); + /* + * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of + * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry + * when the fetch task associated with that from sequence number succeeds. + */ + this.fetchExceptions = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > params.getMaxConcurrentReadBatches(); + } + }; } void start( @@ -224,6 +241,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR synchronized (ShardFollowNodeTask.this) { totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfSuccessfulFetches++; + fetchExceptions.remove(from); operationsReceived += response.getOperations().length; totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } @@ -233,6 +251,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR synchronized (ShardFollowNodeTask.this) { totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfFailedFetches++; + fetchExceptions.put(from, new ElasticsearchException(e)); } handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); }); @@ -412,12 +431,13 @@ public synchronized Status getStatus() { totalIndexTimeMillis, numberOfSuccessfulBulkOperations, numberOfFailedBulkOperations, - numberOfOperationsIndexed); + numberOfOperationsIndexed, + new TreeMap<>(fetchExceptions)); } public static class Status implements Task.Status { - public static final String NAME = "shard-follow-node-task-status"; + public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; static final ParseField SHARD_ID = new ParseField("shard_id"); static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); @@ -438,8 +458,10 @@ public static class Status implements Task.Status { static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations"); static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); + static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); - static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + @SuppressWarnings("unchecked") + static final ConstructingObjectParser STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME, args -> new Status( (int) args[0], (long) args[1], @@ -459,28 +481,51 @@ public static class Status implements Task.Status { (long) args[15], (long) args[16], (long) args[17], - (long) args[18])); + (long) args[18], + new TreeMap<>( + ((List>) args[19]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); + + public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; + + static final ConstructingObjectParser, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = + new ConstructingObjectParser<>( + FETCH_EXCEPTIONS_ENTRY_PARSER_NAME, + args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); + + static { + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); + STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); + } + + static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); + static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); static { - PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); + FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); + FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + FETCH_EXCEPTIONS_ENTRY_EXCEPTION); } private final int shardId; @@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() { return numberOfOperationsIndexed; } + private final NavigableMap fetchExceptions; + + public NavigableMap fetchExceptions() { + return fetchExceptions; + } + Status( final int shardId, final long leaderGlobalCheckpoint, @@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() { final long totalIndexTimeMillis, final long numberOfSuccessfulBulkOperations, final long numberOfFailedBulkOperations, - final long numberOfOperationsIndexed) { + final long numberOfOperationsIndexed, + final NavigableMap fetchExceptions) { this.shardId = shardId; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; @@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() { this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations; this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; this.numberOfOperationsIndexed = numberOfOperationsIndexed; + this.fetchExceptions = fetchExceptions; } public Status(final StreamInput in) throws IOException { @@ -658,11 +711,12 @@ public Status(final StreamInput in) throws IOException { this.numberOfSuccessfulBulkOperations = in.readVLong(); this.numberOfFailedBulkOperations = in.readVLong(); this.numberOfOperationsIndexed = in.readVLong(); + this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); } @Override public String getWriteableName() { - return NAME; + return STATUS_PARSER_NAME; } @Override @@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(numberOfSuccessfulBulkOperations); out.writeVLong(numberOfFailedBulkOperations); out.writeVLong(numberOfOperationsIndexed); + out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); } @Override @@ -720,13 +775,30 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); + builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); + { + for (final Map.Entry entry : fetchExceptions.entrySet()) { + builder.startObject(); + { + builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); + builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + } + builder.endObject(); + } + builder.endObject(); + } + } + builder.endArray(); } builder.endObject(); return builder; } public static Status fromXContent(final XContentParser parser) { - return PARSER.apply(parser, null); + return STATUS_PARSER.apply(parser, null); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index f4cd7a680f4e9..b96d5b47ec263 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ShardFollowNodeTaskRandomTests extends ESTestCase { @@ -54,6 +55,11 @@ private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); + final long numberOfFailedFetches = + testRun.responses.values().stream().flatMap(List::stream).filter(f -> f.exception != null).count(); + assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches)); + // the failures were able to be retried so fetch failures should have cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion)); }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 6138ba96d5439..4eb4283091959 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -6,11 +6,20 @@ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase { @@ -21,6 +30,7 @@ protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) thro @Override protected ShardFollowNodeTask.Status createTestInstance() { + // if you change this constructor, reflect the changes in the hand-written assertions below return new ShardFollowNodeTask.Status( randomInt(), randomNonNegativeLong(), @@ -40,7 +50,57 @@ protected ShardFollowNodeTask.Status createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong()); + randomNonNegativeLong(), + randomReadExceptions()); + } + + @Override + protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedInstance, final ShardFollowNodeTask.Status newInstance) { + assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId())); + assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint())); + assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo())); + assertThat(newInstance.followerGlobalCheckpoint(), equalTo(expectedInstance.followerGlobalCheckpoint())); + assertThat(newInstance.lastRequestedSeqNo(), equalTo(expectedInstance.lastRequestedSeqNo())); + assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads())); + assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites())); + assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites())); + assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion())); + assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis())); + assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches())); + assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches())); + assertThat(newInstance.operationsReceived(), equalTo(expectedInstance.operationsReceived())); + assertThat(newInstance.totalTransferredBytes(), equalTo(expectedInstance.totalTransferredBytes())); + assertThat(newInstance.totalIndexTimeMillis(), equalTo(expectedInstance.totalIndexTimeMillis())); + assertThat(newInstance.numberOfSuccessfulBulkOperations(), equalTo(expectedInstance.numberOfSuccessfulBulkOperations())); + assertThat(newInstance.numberOfFailedBulkOperations(), equalTo(expectedInstance.numberOfFailedBulkOperations())); + assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed())); + assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size())); + assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet())); + for (final Map.Entry entry : newInstance.fetchExceptions().entrySet()) { + // x-content loses the exception + final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey()); + assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().getCause()); + assertThat( + entry.getValue().getCause(), + anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); + assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + } + } + + @Override + protected boolean assertToXContentEquivalence() { + return false; + } + + private NavigableMap randomReadExceptions() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + } + return readExceptions; } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 9eda637dc9df2..54aef6bd3d116 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -20,8 +21,10 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -29,6 +32,8 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -39,11 +44,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private List> bulkShardOperationRequests; private BiConsumer scheduler = (delay, task) -> task.run(); + private Consumer beforeSendShardChangesRequest = status -> {}; + + private AtomicBoolean simulateResponse = new AtomicBoolean(); + private Queue readFailures; private Queue writeFailures; private Queue mappingUpdateFailures; private Queue imdVersions; + private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; + private Queue maxSeqNos; public void testCoordinateReads() { ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); @@ -169,6 +180,27 @@ public void testReceiveRetryableError() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + imdVersions.add(1L); + leaderGlobalCheckpoints.add(63L); + maxSeqNos.add(63L); + simulateResponse.set(true); + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); // NUmber of requests is equal to initial request + retried attempts @@ -178,10 +210,14 @@ public void testReceiveRetryableError() { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(false)); + assertFalse("task is not stopped", task.isStopped()); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo((long)max)); + assertThat(status.numberOfSuccessfulFetches(), equalTo(1L)); + // the fetch failure has cleared + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -194,6 +230,23 @@ public void testReceiveRetryableErrorRetriedTooManyTimes() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } + final AtomicLong retryCounter = new AtomicLong(); + // before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist + beforeSendShardChangesRequest = status -> { + assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); + if (retryCounter.get() > 0) { + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); + } + retryCounter.incrementAndGet(); + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(11)); @@ -202,12 +255,22 @@ public void testReceiveRetryableErrorRetriedTooManyTimes() { assertThat(shardChangesRequest[1], equalTo(64L)); } - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, notNullValue()); assertThat(fatalError.getMessage(), containsString("retrying failed [")); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(11L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); + assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(cause.getShardId().getId(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -216,19 +279,38 @@ public void testReceiveNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - Exception failure = new RuntimeException(); + Exception failure = new RuntimeException("replication failed"); readFailures.add(failure); + final AtomicBoolean invoked = new AtomicBoolean(); + // since there will be only one failure, this should only be invoked once and there should not be a fetch failure + beforeSendShardChangesRequest = status -> { + if (invoked.compareAndSet(false, true)) { + assertThat(status.numberOfFailedFetches(), equalTo(0L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(0)); + } else { + fail("invoked twice"); + } + }; task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - assertThat(task.isStopped(), equalTo(true)); + assertTrue("task is stopped", task.isStopped()); assertThat(fatalError, sameInstance(failure)); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.numberOfFailedFetches(), equalTo(1L)); + assertThat(status.fetchExceptions().entrySet(), hasSize(1)); + final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo(0L)); + assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); + assertNotNull(entry.getValue().getCause()); + assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class)); + final RuntimeException cause = (RuntimeException) entry.getValue().getCause(); + assertThat(cause.getMessage(), equalTo("replication failed")); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } @@ -642,7 +724,9 @@ ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxCon writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); imdVersions = new LinkedList<>(); + leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); + maxSeqNos = new LinkedList<>(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -683,10 +767,23 @@ protected void innerSendBulkShardOperationsRequest( @Override protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer handler, Consumer errorHandler) { + beforeSendShardChangesRequest.accept(getStatus()); shardChangesRequests.add(new long[]{from, requestBatchSize}); Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); if (readFailure != null) { errorHandler.accept(readFailure); + } else if (simulateResponse.get()) { + final Translog.Operation[] operations = new Translog.Operation[requestBatchSize]; + for (int i = 0; i < requestBatchSize; i++) { + operations[i] = new Translog.NoOp(from + i, 0, "test"); + } + final ShardChangesAction.Response response = + new ShardChangesAction.Response( + imdVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + operations); + handler.accept(response); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index 70bf7d484a7ee..34019455b8005 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -44,6 +44,7 @@ - match: { bar.0.number_of_successful_bulk_operations: 0 } - match: { bar.0.number_of_failed_bulk_operations: 0 } - match: { bar.0.number_of_operations_indexed: 0 } + - length: { bar.0.fetch_exceptions: 0 } - do: ccr.unfollow_index: