Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track fetch exceptions for shard follow tasks #33047

Merged
merged 7 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
ShardFollowTask::new),

// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME,
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME,
ShardFollowNodeTask.Status::new)
);
}
Expand All @@ -166,9 +166,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
ShardFollowTask::fromXContent),

// Task statuses
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME),
ShardFollowNodeTask.Status::fromXContent)
);
new NamedXContentRegistry.Entry(
ShardFollowNodeTask.Status.class,
new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME),
ShardFollowNodeTask.Status::fromXContent));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes);
}

@Override
public String toString() {
return "Request{" +
"fromSeqNo=" + fromSeqNo +
", maxOperationCount=" + maxOperationCount +
", shardId=" + shardId +
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
'}';
}

}

public static final class Response extends ActionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
* The node task that fetch the write operations from a leader shard and
Expand Down Expand Up @@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfFailedBulkOperations = 0;
private long numberOfOperationsIndexed = 0;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
Expand All @@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
* when the fetch task associated with that from sequence number succeeds.
*/
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) {
return size() > params.getMaxConcurrentReadBatches();
}
};
}

void start(
Expand Down Expand Up @@ -224,6 +241,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to the approach to keep track of exceptions by from seqno in a fixed size linked hashmap.

operationsReceived += response.getOperations().length;
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
Expand All @@ -233,6 +251,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedFetches++;
fetchExceptions.put(from, new ElasticsearchException(e));
}
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
});
Expand Down Expand Up @@ -412,12 +431,13 @@ public synchronized Status getStatus() {
totalIndexTimeMillis,
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations,
numberOfOperationsIndexed);
numberOfOperationsIndexed,
new TreeMap<>(fetchExceptions));
}

public static class Status implements Task.Status {

public static final String NAME = "shard-follow-node-task-status";
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";

static final ParseField SHARD_ID = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
Expand All @@ -438,8 +458,10 @@ public static class Status implements Task.Status {
static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations");
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations");
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions");

static final ConstructingObjectParser<Status, Void> PARSER = new ConstructingObjectParser<>(NAME,
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<Status, Void> STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME,
args -> new Status(
(int) args[0],
(long) args[1],
Expand All @@ -459,28 +481,51 @@ public static class Status implements Task.Status {
(long) args[15],
(long) args[16],
(long) args[17],
(long) args[18]));
(long) args[18],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[19])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))));

public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";

static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
new ConstructingObjectParser<>(
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME,
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));

static {
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS);
}

static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception");

static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
FETCH_EXCEPTIONS_ENTRY_EXCEPTION);
}

private final int shardId;
Expand Down Expand Up @@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() {
return numberOfOperationsIndexed;
}

private final NavigableMap<Long, ElasticsearchException> fetchExceptions;

public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
return fetchExceptions;
}

Status(
final int shardId,
final long leaderGlobalCheckpoint,
Expand All @@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() {
final long totalIndexTimeMillis,
final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed) {
final long numberOfOperationsIndexed,
final NavigableMap<Long, ElasticsearchException> fetchExceptions) {
this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
Expand All @@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() {
this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations;
this.numberOfFailedBulkOperations = numberOfFailedBulkOperations;
this.numberOfOperationsIndexed = numberOfOperationsIndexed;
this.fetchExceptions = fetchExceptions;
}

public Status(final StreamInput in) throws IOException {
Expand All @@ -658,11 +711,12 @@ public Status(final StreamInput in) throws IOException {
this.numberOfSuccessfulBulkOperations = in.readVLong();
this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong();
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
}

@Override
public String getWriteableName() {
return NAME;
return STATUS_PARSER_NAME;
}

@Override
Expand All @@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(numberOfSuccessfulBulkOperations);
out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed);
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
}

@Override
Expand Down Expand Up @@ -720,13 +775,30 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations);
builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations);
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
{
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
builder.startObject();
{
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
}
builder.endObject();
}
builder.endObject();
}
}
builder.endArray();
}
builder.endObject();
return builder;
}

public static Status fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
return STATUS_PARSER.apply(parser, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class ShardFollowNodeTaskRandomTests extends ESTestCase {

Expand All @@ -54,6 +55,11 @@ private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun
ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
assertThat(status.followerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
final long numberOfFailedFetches =
testRun.responses.values().stream().flatMap(List::stream).filter(f -> f.exception != null).count();
assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches));
// the failures were able to be retried so fetch failures should have cleared
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
});

Expand Down
Loading