Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable acked indexing #17038

Merged
merged 40 commits into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
74194b8
enable testAckedIndexing
bleskes Feb 12, 2016
57501ce
protect against transport shutdowns
bleskes Feb 12, 2016
63ada98
make index counter checking use assertBusy
bleskes Feb 13, 2016
25fae03
better open reference reporting
bleskes Feb 13, 2016
55465dd
fixed closing shard reference on shard already closed
bleskes Feb 13, 2016
51f2c3c
failure message with shard routing + protect against already closed
bleskes Feb 13, 2016
746ca07
better cluster stability check
bleskes Feb 13, 2016
4e39359
close timeoutlisteners with no timeout...
bleskes Feb 14, 2016
673a73d
increased timeout
bleskes Feb 14, 2016
285c3bf
disabled in flight ops check
bleskes Feb 14, 2016
7bb85e4
better proteciton against throwables with no message in assertions
bleskes Feb 14, 2016
3353790
better assertion
bleskes Feb 14, 2016
76465ec
don't through fail to send exception on a the generic thread - it may…
bleskes Feb 14, 2016
4a7524f
more shutdown exceptions
bleskes Feb 14, 2016
2bd09c4
Merge branch 'master' into enable_acked
jasontedor Mar 8, 2016
563304d
No old states polluting pending states queue
jasontedor Mar 9, 2016
bd9e908
Waiting for primary terms
jasontedor Mar 9, 2016
4793630
Merge branch 'master' into enable_acked
jasontedor Mar 28, 2016
14ba0c3
Use longer timeout on nightly tests, but rarely
jasontedor Mar 28, 2016
37d739a
Simplify doc creation check in acked indexing test
jasontedor Mar 28, 2016
97be383
Fix formatting in DWSDIT#TCJDOPI
jasontedor Mar 28, 2016
4e1f62e
Clarify exceptions when failing to fail a replica
jasontedor Mar 28, 2016
2a93889
Add clarifying comment on disrupted in-flight ops
jasontedor Mar 28, 2016
5576526
For now do not guard against already failed engine
jasontedor Mar 28, 2016
4de57fc
Simplify test out of order commit messages
jasontedor Mar 28, 2016
85d3d51
Clarify message on out-of-order state publish
jasontedor Mar 28, 2016
0e5b22a
Remove pending locks assertions from TRA
jasontedor Mar 29, 2016
c4324f9
Merge branch 'master' into enable_acked
jasontedor Mar 29, 2016
649bcdc
Enable acked indexing test
jasontedor Mar 29, 2016
27448dc
Adjust for long random timeout in acked indexing
jasontedor Mar 29, 2016
1f12bee
Reword received superseded cluster state message
jasontedor Apr 2, 2016
8b970d9
Clarify closing of timeout listeners
jasontedor Apr 2, 2016
c7c8b1d
Merge branch 'master' into enable_acked
jasontedor Apr 2, 2016
e201f5c
Fix missing parenthesis in commit timeout message
jasontedor Apr 2, 2016
cffc315
Reject old cluster states and keep the queue clean
jasontedor Apr 5, 2016
7cdd647
Clarify cluster state local node validation
jasontedor Apr 5, 2016
3abf817
Add comment about older cluster states
jasontedor Apr 5, 2016
c2ed5a1
Reorder pending queue clean checks
jasontedor Apr 5, 2016
66cc202
Refactor old state version check to ZenDiscovery
jasontedor Apr 5, 2016
95feb40
Remove superfluous validation of incoming states
jasontedor Apr 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -679,7 +680,7 @@ protected void doRun() throws Exception {
return;
}
// closed in finishAsFailed(e) in the case of error
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request);
if (indexShardReference.isRelocated() == false) {
executeLocally();
} else {
Expand Down Expand Up @@ -797,7 +798,7 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
*/
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// we may end up here if the cluster state used to route the primary is so stale that the underlying
Expand All @@ -816,7 +817,8 @@ protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
return ref;
}

/**
Expand Down Expand Up @@ -997,30 +999,38 @@ public void handleException(TransportException exp) {
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed(
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}

@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
ShardRouting primaryShard = indexShardReference.routingEntry();
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert false : shardFailedError;
shard,
indexShardReference.routingEntry(),
message,
exp,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}

@Override
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
String message = "unknown";
try {
ShardRouting primaryShard = indexShardReference.routingEntry();
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we open a follow up issue that failShard should throw an already closed exception? (this is really what the code protects against). To be clear - I think the code can stay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #17366.

} catch (Throwable t) {
shardFailedError.addSuppressed(t);
}
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment about where these exceptions can come from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 4e1f62e.

shardFailedError instanceof NodeClosedException : shardFailedError;
onReplicaFailure(nodeId, exp);
}
}
}
}
);
}
}
Expand Down Expand Up @@ -1108,7 +1118,9 @@ protected boolean shouldExecuteReplication(Settings settings) {

interface IndexShardReference extends Releasable {
boolean isRelocated();

void failShard(String reason, @Nullable Throwable e);

ShardRouting routingEntry();

/** returns the primary term of the current operation */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void handleException(TransportException exp) {
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp.getCause());
listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ synchronized protected void doStart() {
@Override
synchronized protected void doStop() {
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.cancel();
try {
onGoingTimeout.cancel();
onGoingTimeout.listener.onClose();
Expand All @@ -218,6 +219,12 @@ synchronized protected void doStop() {
}
}
ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS);
// close timeout listeners that did not have an ongoing timeout
postAppliedListeners
.stream()
.filter(listener -> listener instanceof TimeoutClusterStateListener)
.map(listener -> (TimeoutClusterStateListener)listener)
.forEach(TimeoutClusterStateListener::onClose);
remove(localNodeMasterListeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -188,7 +189,14 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
this.publishClusterState =
new PublishClusterStateAction(
settings,
transportService,
clusterService::state,
new NewPendingClusterStateListener(),
discoverySettings,
clusterName);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());

Expand Down Expand Up @@ -766,15 +774,24 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
* If the first condition fails we reject the cluster state and throw an error.
* If the second condition fails we ignore the cluster state.
*/
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
@SuppressForbidden(reason = "debug")
public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
if (currentState.supersedes(newClusterState)) {

// reject cluster states that are not new from the same master
if (currentState.supersedes(newClusterState) ||
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
}

// reject older cluster states if we are following a master
if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
} else {
return false;
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,18 @@ public synchronized void markAsProcessed(ClusterState state) {
currentMaster
);
}
} else if (state.supersedes(pendingState) && pendingContext.committed()) {
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
);
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
assert pendingContext.committed() : "processed cluster state is not committed " + state;
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
} else if (state.version() >= pendingState.version()) {
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
);
contextsToRemove.add(pendingContext);
if (pendingContext.committed()) {
pendingContext.listener.onNewClusterStateProcessed();
}
}
}
// now ack the processed state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
Expand All @@ -58,11 +57,13 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
*
Expand All @@ -81,17 +82,22 @@ public interface NewPendingClusterStateListener {
}

private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final Supplier<ClusterState> clusterStateSupplier;
private final NewPendingClusterStateListener newPendingClusterStatelistener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
private final PendingClusterStatesQueue pendingStatesQueue;

public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
public PublishClusterStateAction(
Settings settings,
TransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that node provider interface can go away (now that's were on java8 - function refs FTW)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same but can we keep it separate from this pull request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for SURE!

NewPendingClusterStateListener listener,
DiscoverySettings discoverySettings,
ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.clusterStateSupplier = clusterStateSupplier;
this.newPendingClusterStatelistener = listener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
Expand Down Expand Up @@ -363,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
Expand Down Expand Up @@ -394,14 +400,25 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
final ClusterState clusterState = clusterStateSupplier.get();

if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
throw new IllegalStateException("received state with a local node that does not match the current local node");
}

if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
logger.warn(message);
throw new IllegalStateException(message);
}

ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
}

protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
Expand Down Expand Up @@ -518,7 +535,7 @@ public void waitForCommit(TimeValue commitTimeout) {
}

if (timedout) {
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])");
}
if (isCommitted() == false) {
throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,9 @@ public boolean isRelocated() {
@Override
public void failShard(String reason, @Nullable Throwable e) {
isShardFailed.set(true);
if (randomBoolean()) {
throw new ElasticsearchException("simulated");
}
}

@Override
Expand Down Expand Up @@ -1173,7 +1176,7 @@ protected boolean resolveIndex() {


@Override
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id()));
}
Expand Down
Loading