Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable acked indexing #17038

Merged
merged 40 commits into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
74194b8
enable testAckedIndexing
bleskes Feb 12, 2016
57501ce
protect against transport shutdowns
bleskes Feb 12, 2016
63ada98
make index counter checking use assertBusy
bleskes Feb 13, 2016
25fae03
better open reference reporting
bleskes Feb 13, 2016
55465dd
fixed closing shard reference on shard already closed
bleskes Feb 13, 2016
51f2c3c
failure message with shard routing + protect against already closed
bleskes Feb 13, 2016
746ca07
better cluster stability check
bleskes Feb 13, 2016
4e39359
close timeoutlisteners with no timeout...
bleskes Feb 14, 2016
673a73d
increased timeout
bleskes Feb 14, 2016
285c3bf
disabled in flight ops check
bleskes Feb 14, 2016
7bb85e4
better proteciton against throwables with no message in assertions
bleskes Feb 14, 2016
3353790
better assertion
bleskes Feb 14, 2016
76465ec
don't through fail to send exception on a the generic thread - it may…
bleskes Feb 14, 2016
4a7524f
more shutdown exceptions
bleskes Feb 14, 2016
2bd09c4
Merge branch 'master' into enable_acked
jasontedor Mar 8, 2016
563304d
No old states polluting pending states queue
jasontedor Mar 9, 2016
bd9e908
Waiting for primary terms
jasontedor Mar 9, 2016
4793630
Merge branch 'master' into enable_acked
jasontedor Mar 28, 2016
14ba0c3
Use longer timeout on nightly tests, but rarely
jasontedor Mar 28, 2016
37d739a
Simplify doc creation check in acked indexing test
jasontedor Mar 28, 2016
97be383
Fix formatting in DWSDIT#TCJDOPI
jasontedor Mar 28, 2016
4e1f62e
Clarify exceptions when failing to fail a replica
jasontedor Mar 28, 2016
2a93889
Add clarifying comment on disrupted in-flight ops
jasontedor Mar 28, 2016
5576526
For now do not guard against already failed engine
jasontedor Mar 28, 2016
4de57fc
Simplify test out of order commit messages
jasontedor Mar 28, 2016
85d3d51
Clarify message on out-of-order state publish
jasontedor Mar 28, 2016
0e5b22a
Remove pending locks assertions from TRA
jasontedor Mar 29, 2016
c4324f9
Merge branch 'master' into enable_acked
jasontedor Mar 29, 2016
649bcdc
Enable acked indexing test
jasontedor Mar 29, 2016
27448dc
Adjust for long random timeout in acked indexing
jasontedor Mar 29, 2016
1f12bee
Reword received superseded cluster state message
jasontedor Apr 2, 2016
8b970d9
Clarify closing of timeout listeners
jasontedor Apr 2, 2016
c7c8b1d
Merge branch 'master' into enable_acked
jasontedor Apr 2, 2016
e201f5c
Fix missing parenthesis in commit timeout message
jasontedor Apr 2, 2016
cffc315
Reject old cluster states and keep the queue clean
jasontedor Apr 5, 2016
7cdd647
Clarify cluster state local node validation
jasontedor Apr 5, 2016
3abf817
Add comment about older cluster states
jasontedor Apr 5, 2016
c2ed5a1
Reorder pending queue clean checks
jasontedor Apr 5, 2016
66cc202
Refactor old state version check to ZenDiscovery
jasontedor Apr 5, 2016
95feb40
Remove superfluous validation of incoming states
jasontedor Apr 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -403,7 +404,7 @@ protected void responseWithFailure(Throwable t) {
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request)) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
Expand Down Expand Up @@ -706,7 +707,7 @@ protected void doRun() throws Exception {
return;
}
// closed in finishAsFailed(e) in the case of error
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request);
if (indexShardReference.isRelocated() == false) {
executeLocally();

Expand Down Expand Up @@ -820,24 +821,64 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
}
}


static ConcurrentMap<IndexShardReference, String> openShardReferences;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels way more natural on the IndexShard level - I felt that way when I made it but couldn't find a way (at the time) to push it to IndexShard while getting the debugging information to be used as the string value here. We can pass it to IndexShard.acquirePrimaryOperationLock and acquireReplicaOperationLock but that will force a toString on the request object. How about making the reason an Object type and only call toString on it in IndexShard?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or even better a reason supplier called on demand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's more natural there, but I prefer to keep it where it is until there's a clear need for it at the lower level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guys we can't have static maps on classes like this. If we want some kind of static assertions we should inject some mock services just as we do for for SearchContext in MockSearchService. I am a bit confused why we have a static factory method on literally the only impl of an interface in IndexShardReferenceImpl this smells like a design flaw to me. I think with this change together we should rather expose a pluggalbe service or move the creation into IndexService and let folks plug in a factory into IndexModule in order to impl this assertion? but static per JVM concurrent maps seems like a playing russian roulette

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 0e5b22a.


static boolean setupShardReferenceAssertions() {
openShardReferences = new ConcurrentHashMap<>();
return true;
}

static boolean addShardReference(IndexShardReference ref, String desc) {
String prev = openShardReferences.put(ref, desc);
if (prev != null) {
throw new AssertionError("shard ref " + ref + " is added twice. current [" + desc + "] prev [" + prev + "]");
}
return true;
}

static boolean removeShardReference(IndexShardReference ref) {
assert openShardReferences.remove(ref) != null : "failed to find ref [" + ref + "]";
return true;
}

static {
assert setupShardReferenceAssertions();
}

static public void assertAllShardReferencesAreCleaned() {
if (openShardReferences == null || openShardReferences.isEmpty()) {
return;
}
StringBuilder sb = new StringBuilder();
for (String desc : openShardReferences.values()) {
sb.append(desc).append("\n");
}
assert sb.length() == 0 : "Found unclosed shard references:\n" + sb;
}

/**
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
*/
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, true);
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, true);
assert addShardReference(ref, "primary: " + request.toString() + " " + ref.routingEntry());
return ref;
}

/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, false);
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, false);
assert addShardReference(ref, "replica: " + request.toString() + " " + ref.routingEntry());
return ref;
}

/**
Expand Down Expand Up @@ -1018,30 +1059,36 @@ public void handleException(TransportException exp) {
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}

@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
ShardRouting primaryShard = indexShardReference.routingEntry();
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert false : shardFailedError;
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}

@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
String message = "unknown";
try {
ShardRouting primaryShard = indexShardReference.routingEntry();
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we open a follow up issue that failShard should throw an already closed exception? (this is really what the code protects against). To be clear - I think the code can stay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #17366.

} catch (Throwable t) {
shardFailedError.addSuppressed(t);
}
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert shardFailedError instanceof TransportException ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment about where these exceptions can come from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 4e1f62e.

shardFailedError instanceof NodeClosedException : shardFailedError;
onReplicaFailure(nodeId, exp);
}
}
}
}
);
}
}
Expand Down Expand Up @@ -1125,7 +1172,9 @@ protected boolean shouldExecuteReplication(Settings settings) {

interface IndexShardReference extends Releasable {
boolean isRelocated();

void failShard(String reason, @Nullable Throwable e);

ShardRouting routingEntry();
}

Expand All @@ -1146,6 +1195,7 @@ static final class IndexShardReferenceImpl implements IndexShardReference {
@Override
public void close() {
operationLock.close();
assert removeShardReference(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
Expand All @@ -54,6 +53,7 @@
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void handleException(TransportException exp) {
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp.getCause());
listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ protected void doStop() {
FutureUtils.cancel(this.reconnectToNodes);
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.cancel();
onGoingTimeout.listener.onClose();
}
ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS);
postAppliedListeners.stream().filter(listener -> listener instanceof TimeoutClusterStateListener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment why we do it out of loop? (i.e. loop again)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this was lost, but I think is a good thing to have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 8b970d9.

.forEach(listener -> ((TimeoutClusterStateListener) listener).onClose());
remove(localNodeMasterListeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -402,6 +403,18 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
}

ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
final String message = String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very close to the new check. I'm not sure we need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check rejects incoming states from the same master that are out of order. I do not think that the new check covers this case. I think it's needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in. If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:

  1. A commit message for this specific CS comes in.
  2. A commit message for a higher CS comes in (including the lastSeenClusterState) and that CS is processed.

Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in.

Just to be clear, this is only if we are following a master. The line of code in question will reject older states from the same master that are out of order.

If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:

  1. A commit message for this specific CS comes in.
  2. A commit message for a higher CS comes in (including the lastSeenClusterState) and that CS is processed.
    Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above?

There is not.

Copy link
Member Author

@jasontedor jasontedor Apr 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I pushed commit 95feb40.

Locale.ROOT,
"received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add that it came from the current master? Also - thinking about this more - this can only happen in our testing right? In practice we only have 1 channel (if someone doesn't mess with some settings). Maybe add a comment about it if so...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 85d3d51.

incomingState.version(),
incomingState.stateUUID(),
lastSeenClusterState.version(),
lastSeenClusterState.stateUUID()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}

protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,13 @@ public void releaseSnapshot(IndexCommit snapshot) throws IOException {
*/
public void failShard(String reason, @Nullable Throwable e) {
// fail the engine. This will cause this shard to also be removed from the node's index service.
getEngine().failEngine(reason, e);
final Engine engine = getEngineOrNull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha. I fixed this inline - forgot :) I think this should be a separate PR - it's not really needed for this one (we protect for it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in 5576526.

if (engine == null) {
logger.trace("ignoring request to fail the shard, we're already closed. (reason: [{}])", e, reason);

} else {
engine.failEngine(reason, e);
}
}

public Engine.Searcher acquireSearcher(String source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,9 @@ public boolean isRelocated() {
@Override
public void failShard(String reason, @Nullable Throwable e) {
isShardFailed.set(true);
if (randomBoolean()) {
throw new ElasticsearchException("simulated");
}
}

@Override
Expand Down Expand Up @@ -1097,11 +1100,11 @@ protected boolean resolveIndex() {
}

@Override
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
return getOrCreateIndexShardOperationsCounter();
}

protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, Request request) {
return getOrCreateIndexShardOperationsCounter();
}
}
Expand Down
Loading