diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b6f85acacbf6f..8383872cdaea3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -79,6 +79,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -679,7 +680,7 @@ protected void doRun() throws Exception { return; } // closed in finishAsFailed(e) in the case of error - indexShardReference = getIndexShardReferenceOnPrimary(shardId); + indexShardReference = getIndexShardReferenceOnPrimary(shardId, request); if (indexShardReference.isRelocated() == false) { executeLocally(); } else { @@ -797,7 +798,7 @@ void finishBecauseUnavailable(ShardId shardId, String message) { * returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}). */ - protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -816,7 +817,8 @@ protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); + IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); + return ref; } /** @@ -997,30 +999,38 @@ public void handleException(TransportException exp) { String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); logger.warn("[{}] {}", exp, shardId, message); shardStateAction.shardFailed( - shard, - indexShardReference.routingEntry(), - message, - exp, - new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onReplicaFailure(nodeId, exp); - } - - @Override - public void onFailure(Throwable shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - ShardRouting primaryShard = indexShardReference.routingEntry(); - String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); - // we are no longer the primary, fail ourselves and start over - indexShardReference.failShard(message, shardFailedError); - forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); - } else { - assert false : shardFailedError; + shard, + indexShardReference.routingEntry(), + message, + exp, + new ShardStateAction.Listener() { + @Override + public void onSuccess() { onReplicaFailure(nodeId, exp); } + + @Override + public void onFailure(Throwable shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + String message = "unknown"; + try { + ShardRouting primaryShard = indexShardReference.routingEntry(); + message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); + // we are no longer the primary, fail ourselves and start over + indexShardReference.failShard(message, shardFailedError); + } catch (Throwable t) { + shardFailedError.addSuppressed(t); + } + forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onReplicaFailure(nodeId, exp); + } + } } - } ); } } @@ -1108,7 +1118,9 @@ protected boolean shouldExecuteReplication(Settings settings) { interface IndexShardReference extends Releasable { boolean isRelocated(); + void failShard(String reason, @Nullable Throwable e); + ShardRouting routingEntry(); /** returns the primary term of the current operation */ diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 6b9079cb36381..5a44a6454674d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -111,7 +112,7 @@ public void handleException(TransportException exp) { waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener); } else { logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry); - listener.onFailure(exp.getCause()); + listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp); } } }); diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index f74e3b6f85f61..15c2b5c393908 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -210,6 +210,7 @@ synchronized protected void doStart() { @Override synchronized protected void doStop() { for (NotifyTimeout onGoingTimeout : onGoingTimeouts) { + onGoingTimeout.cancel(); try { onGoingTimeout.cancel(); onGoingTimeout.listener.onClose(); @@ -218,6 +219,12 @@ synchronized protected void doStop() { } } ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS); + // close timeout listeners that did not have an ongoing timeout + postAppliedListeners + .stream() + .filter(listener -> listener instanceof TimeoutClusterStateListener) + .map(listener -> (TimeoutClusterStateListener)listener) + .forEach(TimeoutClusterStateListener::onClose); remove(localNodeMasterListeners); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index ac595e640bc04..388e92c876413 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -188,7 +189,14 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName); + this.publishClusterState = + new PublishClusterStateAction( + settings, + transportService, + clusterService::state, + new NewPendingClusterStateListener(), + discoverySettings, + clusterName); this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); @@ -766,15 +774,24 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * If the first condition fails we reject the cluster state and throw an error. * If the second condition fails we ignore the cluster state. */ - static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { + @SuppressForbidden(reason = "debug") + public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState); - if (currentState.supersedes(newClusterState)) { + + // reject cluster states that are not new from the same master + if (currentState.supersedes(newClusterState) || + (newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) { // if the new state has a smaller version, and it has the same master node, then no need to process it + logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); + return true; + } + + // reject older cluster states if we are following a master + if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) { logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); return true; - } else { - return false; } + return false; } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java index b3ad7329d995e..e591f44d30992 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java @@ -164,16 +164,18 @@ public synchronized void markAsProcessed(ClusterState state) { currentMaster ); } - } else if (state.supersedes(pendingState) && pendingContext.committed()) { - logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]", - pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version() - ); - contextsToRemove.add(pendingContext); - pendingContext.listener.onNewClusterStateProcessed(); } else if (pendingState.stateUUID().equals(state.stateUUID())) { assert pendingContext.committed() : "processed cluster state is not committed " + state; contextsToRemove.add(pendingContext); pendingContext.listener.onNewClusterStateProcessed(); + } else if (state.version() >= pendingState.version()) { + logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]", + pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version() + ); + contextsToRemove.add(pendingContext); + if (pendingContext.committed()) { + pendingContext.listener.onNewClusterStateProcessed(); + } } } // now ack the processed state diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index d7c4ac1568dfb..c2333ff177d37 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -41,7 +41,6 @@ import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -58,11 +57,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * @@ -81,17 +82,22 @@ public interface NewPendingClusterStateListener { } private final TransportService transportService; - private final DiscoveryNodesProvider nodesProvider; + private final Supplier clusterStateSupplier; private final NewPendingClusterStateListener newPendingClusterStatelistener; private final DiscoverySettings discoverySettings; private final ClusterName clusterName; private final PendingClusterStatesQueue pendingStatesQueue; - public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + public PublishClusterStateAction( + Settings settings, + TransportService transportService, + Supplier clusterStateSupplier, + NewPendingClusterStateListener listener, + DiscoverySettings discoverySettings, + ClusterName clusterName) { super(settings); this.transportService = transportService; - this.nodesProvider = nodesProvider; + this.clusterStateSupplier = clusterStateSupplier; this.newPendingClusterStatelistener = listener; this.discoverySettings = discoverySettings; this.clusterName = clusterName; @@ -363,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request, final ClusterState incomingState; // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode()); + incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff diff = lastSeenClusterState.readDiffFrom(in); @@ -394,14 +400,25 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); throw new IllegalStateException("received state from a node that is not part of the cluster"); } - final DiscoveryNodes currentNodes = nodesProvider.nodes(); + final ClusterState clusterState = clusterStateSupplier.get(); - if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { + if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); - throw new IllegalStateException("received state from a node that is not part of the cluster"); + throw new IllegalStateException("received state with a local node that does not match the current local node"); + } + + if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) { + String message = String.format( + Locale.ROOT, + "rejecting cluster state version [%d] uuid [%s] received from [%s]", + incomingState.version(), + incomingState.stateUUID(), + incomingState.nodes().getMasterNodeId() + ); + logger.warn(message); + throw new IllegalStateException(message); } - ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { @@ -518,7 +535,7 @@ public void waitForCommit(TimeValue commitTimeout) { } if (timedout) { - markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]"); + markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])"); } if (isCommitted() == false) { throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index b928e08d86072..2efc51f0d8333 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1062,6 +1062,9 @@ public boolean isRelocated() { @Override public void failShard(String reason, @Nullable Throwable e) { isShardFailed.set(true); + if (randomBoolean()) { + throw new ElasticsearchException("simulated"); + } } @Override @@ -1173,7 +1176,7 @@ protected boolean resolveIndex() { @Override - protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id())); } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index ba24c05088145..109c59d845a6e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.discovery; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -138,6 +137,13 @@ protected int numberOfReplicas() { return 1; } + @Override + protected void beforeIndexDeletion() { + // some test may leave operations in flight + // this is because the disruption schemes swallow requests by design + // as such, these operations will never be marked as finished + } + private List startCluster(int numberOfNodes) throws ExecutionException, InterruptedException { return startCluster(numberOfNodes, -1); } @@ -146,7 +152,8 @@ private List startCluster(int numberOfNodes, int minimumMasterNode) thro return startCluster(numberOfNodes, minimumMasterNode, null); } - private List startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws ExecutionException, InterruptedException { + private List startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws + ExecutionException, InterruptedException { configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); ensureStableCluster(numberOfNodes); @@ -175,11 +182,20 @@ protected Collection> nodePlugins() { return pluginList(MockTransportService.TestPlugin.class); } - private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureUnicastCluster( + int numberOfNodes, + @Nullable int[] unicastHostsOrdinals, + int minimumMasterNode + ) throws ExecutionException, InterruptedException { configureUnicastCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode); } - private void configureUnicastCluster(Settings settings, int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureUnicastCluster( + Settings settings, + int numberOfNodes, + @Nullable int[] unicastHostsOrdinals, + int minimumMasterNode + ) throws ExecutionException, InterruptedException { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } @@ -253,7 +269,8 @@ public void testNodesFDAfterMasterReelection() throws Exception { logger.info("--> reducing min master nodes to 2"); assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)).get()); + .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)) + .get()); String master = internalCluster().getMasterName(); String nonMaster = null; @@ -278,8 +295,8 @@ public void testVerifyApiBlocksDuringPartition() throws Exception { // Makes sure that the get request can be executed on each node locally: assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) )); // Everything is stable now, it is now time to simulate evil... @@ -359,8 +376,8 @@ public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { assertAcked(prepareCreate("test") .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) )); ensureGreen(); @@ -380,7 +397,8 @@ public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { networkPartition.stopDisrupting(); for (String node : nodes) { - ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()), true, node); + ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()), + true, node); } logger.info("issue a reroute"); @@ -421,17 +439,20 @@ public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { *

* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates */ - // NOTE: if you remove the awaitFix, make sure to port the test to the 1.x branch - @LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize") - @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") + @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE," + + "indices.recovery:TRACE,indices.cluster:TRACE") public void testAckedIndexing() throws Exception { + + final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; + final String timeout = seconds + "s"; + // TODO: add node count randomizaion final List nodes = startCluster(3); assertAcked(prepareCreate("test") .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) )); ensureGreen(); @@ -455,36 +476,34 @@ public void testAckedIndexing() throws Exception { final Client client = client(node); final String name = "indexer_" + indexers.size(); final int numPrimaries = getNumShards("test").numPrimaries; - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - while (!stop.get()) { - String id = null; + Thread thread = new Thread(() -> { + while (!stop.get()) { + String id = null; + try { + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { + continue; + } + logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits()); try { - if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { - continue; - } - logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits()); - try { - id = Integer.toString(idGenerator.incrementAndGet()); - int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); - logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); - IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get(); - assertThat(response.getVersion(), equalTo(1L)); - ackedDocs.put(id, node); - logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node); - } catch (ElasticsearchException e) { - exceptedExceptions.add(e); - logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node); - } finally { - countDownLatchRef.get().countDown(); - logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount()); - } - } catch (InterruptedException e) { - // fine - semaphore interrupt - } catch (Throwable t) { - logger.info("unexpected exception in background thread of [{}]", t, node); + id = Integer.toString(idGenerator.incrementAndGet()); + int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); + logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); + IndexResponse response = + client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout); + assertTrue("doc [" + id + "] should have been created", response.isCreated()); + ackedDocs.put(id, node); + logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node); + } catch (ElasticsearchException e) { + exceptedExceptions.add(e); + logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node); + } finally { + countDownLatchRef.get().countDown(); + logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount()); } + } catch (InterruptedException e) { + // fine - semaphore interrupt + } catch (Throwable t) { + logger.info("unexpected exception in background thread of [{}]", t, node); } } }); @@ -514,11 +533,15 @@ public void run() { assertThat(semaphore.availablePermits(), equalTo(0)); semaphore.release(docsPerIndexer); } - assertTrue(countDownLatchRef.get().await(60000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS)); + logger.info("waiting for indexing requests to complete"); + assertTrue(countDownLatchRef.get().await(docsPerIndexer * seconds * 1000 + 2000, TimeUnit.MILLISECONDS)); logger.info("stopping disruption"); disruptionScheme.stopDisrupting(); - ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + DISRUPTION_HEALING_OVERHEAD.millis())); + for (String node : internalCluster().getNodeNames()) { + ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + + DISRUPTION_HEALING_OVERHEAD.millis()), true, node); + } ensureGreen("test"); logger.info("validating successful docs"); @@ -615,7 +638,8 @@ public void testStaleMasterNotHijackingMajority() throws Exception { majoritySide.remove(oldMasterNode); // Keeps track of the previous and current master when a master node transition took place on each node on the majority side: - final Map>> masters = Collections.synchronizedMap(new HashMap>>()); + final Map>> masters = Collections.synchronizedMap(new HashMap>>()); for (final String node : majoritySide) { masters.put(node, new ArrayList>()); internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() { @@ -624,7 +648,8 @@ public void clusterChanged(ClusterChangedEvent event) { DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode(); DiscoveryNode currentMaster = event.state().nodes().getMasterNode(); if (!Objects.equals(previousMaster, currentMaster)) { - logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(), event.previousState()); + logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(), + event.previousState()); String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null; String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null; masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName)); @@ -656,7 +681,8 @@ public void clusterChanged(ClusterChangedEvent event) { // but will be queued and once the old master node un-freezes it gets executed. // The old master node will send this update + the cluster state where he is flagged as master to the other // nodes that follow the new master. These nodes should ignore this update. - internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new + ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).build(); @@ -693,11 +719,16 @@ public void run() { for (Map.Entry>> entry : masters.entrySet()) { String nodeName = entry.getKey(); List> recordedMasterTransition = entry.getValue(); - assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), equalTo(2)); - assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), equalTo(oldMasterNode)); - assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(0).v2(), nullValue()); - assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), nullValue()); - assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(1).v2(), equalTo(newMasterNode)); + assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), + equalTo(2)); + assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), + equalTo(oldMasterNode)); + assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition + .get(0).v2(), nullValue()); + assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), + nullValue()); + assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]", + recordedMasterTransition.get(1).v2(), equalTo(newMasterNode)); } } @@ -710,8 +741,8 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception { assertAcked(prepareCreate("test") .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) ) .get()); ensureGreen("test"); @@ -727,7 +758,8 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception { assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); - IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get(); + IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value") + .get(); assertThat(indexResponse.getVersion(), equalTo(1L)); logger.info("Verifying if document exists via node[{}]", notIsolatedNode); @@ -845,17 +877,21 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception { DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); - TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName()); + TransportService masterTranspotService = + internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName()); logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); - MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); + MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, + nonMasterNode); nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService); assertNoMaster(nonMasterNode); logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); - MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); - TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName()); + MockTransportService masterTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + TransportService localTransportService = + internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName()); if (randomBoolean()) { masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME); } else { @@ -864,9 +900,11 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception { logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); final CountDownLatch countDownLatch = new CountDownLatch(2); - nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { + nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService + .original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions + options) throws IOException, TransportException { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { countDownLatch.countDown(); } @@ -894,16 +932,16 @@ public void testSendingShardFailure() throws Exception { List nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList()); String nonMasterNode = randomFrom(nonMasterNodes); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) - )); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + )); ensureGreen(); String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId(); // fail a random shard ShardRouting failedShard = - randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); + randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(); @@ -912,7 +950,8 @@ public void testSendingShardFailure() throws Exception { NetworkPartition networkPartition = addRandomIsolation(isolatedNode); networkPartition.startDisrupting(); - service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { + service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new + ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -989,7 +1028,8 @@ public void testNodeNotReachableFromMaster() throws Exception { } logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); - MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, + masterNode); if (randomBoolean()) { masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode)); } else { @@ -1021,9 +1061,9 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep final String masterNode = masterNodeFuture.get(); logger.info("--> creating index [test] with one shard and on replica"); assertAcked(prepareCreate("test").setSettings( - Settings.builder().put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) ); ensureGreen("test"); @@ -1040,7 +1080,8 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch endRelocationLatch = new CountDownLatch(1); - transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); + transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, + endRelocationLatch)); internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get(); // wait for relocation to start beginRelocationLatch.await(); @@ -1177,7 +1218,8 @@ public void run() { assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode()); if (expectedBlocks != null) { for (ClusterBlockLevel level : expectedBlocks.levels()) { - assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock(level)); + assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock + (level)); } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index dec3a2ae3edcd..a6638eb19cf02 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.test.ESTestCase; @@ -64,7 +65,7 @@ public void testShouldIgnoreNewClusterState() { assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(1); - assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(2); assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java index 3d4c464f128a8..9c90e8de90074 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java @@ -195,10 +195,11 @@ public void testQueueStats() { highestCommitted = context.state; } } + assert highestCommitted != null; queue.markAsProcessed(highestCommitted); - assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size())); - assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size())); + assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version()))); + assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version()))); assertThat(queue.stats().getCommitted(), equalTo(0)); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index b32581e0e2b6b..ceb125deda28a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -63,16 +63,20 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -159,7 +163,7 @@ public MockNode createMockNode(String name, Settings settings, Version version, DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version); DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress()); MockNode node = new MockNode(discoveryNode, service, listener, logger); - node.action = buildPublishClusterStateAction(settings, service, node, node); + node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node); final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1); TransportConnectionListener waitForConnection = new TransportConnectionListener() { @Override @@ -231,10 +235,21 @@ protected MockTransportService buildTransportService(Settings settings, Version return transportService; } - protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider, - PublishClusterStateAction.NewPendingClusterStateListener listener) { - DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); + protected MockPublishAction buildPublishClusterStateAction( + Settings settings, + MockTransportService transportService, + Supplier clusterStateSupplier, + PublishClusterStateAction.NewPendingClusterStateListener listener + ) { + DiscoverySettings discoverySettings = + new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + return new MockPublishAction( + settings, + transportService, + clusterStateSupplier, + listener, + discoverySettings, + ClusterName.DEFAULT); } public void testSimpleClusterStatePublishing() throws Exception { @@ -596,11 +611,12 @@ public void testIncomingClusterStateValidation() throws Exception { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state from another master"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting")); } logger.info("--> test state from the current master is accepted"); node.action.validateIncomingState(ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState); + .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState); logger.info("--> testing rejection of another cluster name"); @@ -608,6 +624,7 @@ public void testIncomingClusterStateValidation() throws Exception { node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState); fail("node accepted state with another cluster name"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); } logger.info("--> testing rejection of a cluster state with wrong local node"); @@ -618,6 +635,7 @@ public void testIncomingClusterStateValidation() throws Exception { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state with non-existence local node"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); } try { @@ -628,12 +646,22 @@ public void testIncomingClusterStateValidation() throws Exception { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state with existent but wrong local node"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); } logger.info("--> testing acceptance of an old cluster state"); - state = node.clusterState; + final ClusterState incomingState = node.clusterState; node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build(); - node.action.validateIncomingState(state, node.clusterState); + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); + final String message = String.format( + Locale.ROOT, + "rejecting cluster state version [%d] uuid [%s] received from [%s]", + incomingState.version(), + incomingState.stateUUID(), + incomingState.nodes().getMasterNodeId() + ); + assertThat(e, hasToString("java.lang.IllegalStateException: " + message)); // an older version from a *new* master is also OK! ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build(); @@ -646,18 +674,17 @@ public void testIncomingClusterStateValidation() throws Exception { node.action.validateIncomingState(state, previousState); } - public void testInterleavedPublishCommit() throws Throwable { + public void testOutOfOrderCommitMessages() throws Throwable { MockNode node = createMockNode("node").setAsMaster(); final CapturingTransportChannel channel = new CapturingTransportChannel(); List states = new ArrayList<>(); - final int numOfStates = scaledRandomIntBetween(3, 10); + final int numOfStates = scaledRandomIntBetween(3, 25); for (int i = 1; i <= numOfStates; i++) { states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build()); } final ClusterState finalState = states.get(numOfStates - 1); - Collections.shuffle(states, random()); logger.info("--> publishing states"); for (ClusterState state : states) { @@ -667,19 +694,28 @@ public void testInterleavedPublishCommit() throws Throwable { assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); assertThat(channel.error.get(), nullValue()); channel.clear(); + } logger.info("--> committing states"); + long largestVersionSeen = Long.MIN_VALUE; Randomness.shuffle(states); for (ClusterState state : states) { node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); - assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); - if (channel.error.get() != null) { - throw channel.error.get(); + if (largestVersionSeen < state.getVersion()) { + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + if (channel.error.get() != null) { + throw channel.error.get(); + } + largestVersionSeen = state.getVersion(); + } else { + // older cluster states will be rejected + assertNotNull(channel.error.get()); + assertThat(channel.error.get(), instanceOf(IllegalStateException.class)); } + channel.clear(); } - channel.clear(); //now check the last state held assertSameState(node.clusterState, finalState); @@ -817,8 +853,8 @@ static class MockPublishAction extends PublishClusterStateAction { AtomicBoolean timeoutOnCommit = new AtomicBoolean(); AtomicBoolean errorOnCommit = new AtomicBoolean(); - public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { - super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName); + public MockPublishAction(Settings settings, TransportService transportService, Supplier clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0db133ba0cf80..cdd5befe6b2a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -309,8 +309,8 @@ public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10)); } else if (random.nextInt(100) <= 90) { - builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); - builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); } // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); @@ -544,7 +544,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException { logger.info("changing cluster size from {} to {}, {} data nodes", size(), n + numShareCoordOnlyNodes, n); Set nodesToRemove = new HashSet<>(); int numNodesAndClients = 0; - while (values.hasNext() && numNodesAndClients++ < size-n) { + while (values.hasNext() && numNodesAndClients++ < size - n) { NodeAndClient next = values.next(); nodesToRemove.add(next); removeDisruptionSchemeFromNode(next);