Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable acked indexing #17038

Merged
merged 40 commits into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
74194b8
enable testAckedIndexing
bleskes Feb 12, 2016
57501ce
protect against transport shutdowns
bleskes Feb 12, 2016
63ada98
make index counter checking use assertBusy
bleskes Feb 13, 2016
25fae03
better open reference reporting
bleskes Feb 13, 2016
55465dd
fixed closing shard reference on shard already closed
bleskes Feb 13, 2016
51f2c3c
failure message with shard routing + protect against already closed
bleskes Feb 13, 2016
746ca07
better cluster stability check
bleskes Feb 13, 2016
4e39359
close timeoutlisteners with no timeout...
bleskes Feb 14, 2016
673a73d
increased timeout
bleskes Feb 14, 2016
285c3bf
disabled in flight ops check
bleskes Feb 14, 2016
7bb85e4
better proteciton against throwables with no message in assertions
bleskes Feb 14, 2016
3353790
better assertion
bleskes Feb 14, 2016
76465ec
don't through fail to send exception on a the generic thread - it may…
bleskes Feb 14, 2016
4a7524f
more shutdown exceptions
bleskes Feb 14, 2016
2bd09c4
Merge branch 'master' into enable_acked
jasontedor Mar 8, 2016
563304d
No old states polluting pending states queue
jasontedor Mar 9, 2016
bd9e908
Waiting for primary terms
jasontedor Mar 9, 2016
4793630
Merge branch 'master' into enable_acked
jasontedor Mar 28, 2016
14ba0c3
Use longer timeout on nightly tests, but rarely
jasontedor Mar 28, 2016
37d739a
Simplify doc creation check in acked indexing test
jasontedor Mar 28, 2016
97be383
Fix formatting in DWSDIT#TCJDOPI
jasontedor Mar 28, 2016
4e1f62e
Clarify exceptions when failing to fail a replica
jasontedor Mar 28, 2016
2a93889
Add clarifying comment on disrupted in-flight ops
jasontedor Mar 28, 2016
5576526
For now do not guard against already failed engine
jasontedor Mar 28, 2016
4de57fc
Simplify test out of order commit messages
jasontedor Mar 28, 2016
85d3d51
Clarify message on out-of-order state publish
jasontedor Mar 28, 2016
0e5b22a
Remove pending locks assertions from TRA
jasontedor Mar 29, 2016
c4324f9
Merge branch 'master' into enable_acked
jasontedor Mar 29, 2016
649bcdc
Enable acked indexing test
jasontedor Mar 29, 2016
27448dc
Adjust for long random timeout in acked indexing
jasontedor Mar 29, 2016
1f12bee
Reword received superseded cluster state message
jasontedor Apr 2, 2016
8b970d9
Clarify closing of timeout listeners
jasontedor Apr 2, 2016
c7c8b1d
Merge branch 'master' into enable_acked
jasontedor Apr 2, 2016
e201f5c
Fix missing parenthesis in commit timeout message
jasontedor Apr 2, 2016
cffc315
Reject old cluster states and keep the queue clean
jasontedor Apr 5, 2016
7cdd647
Clarify cluster state local node validation
jasontedor Apr 5, 2016
3abf817
Add comment about older cluster states
jasontedor Apr 5, 2016
c2ed5a1
Reorder pending queue clean checks
jasontedor Apr 5, 2016
66cc202
Refactor old state version check to ZenDiscovery
jasontedor Apr 5, 2016
95feb40
Remove superfluous validation of incoming states
jasontedor Apr 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
this.publishClusterState =
new PublishClusterStateAction(
settings,
transportService,
clusterService::state,
new NewPendingClusterStateListener(),
discoverySettings,
clusterName);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,17 @@ public synchronized void markAsProcessed(ClusterState state) {
currentMaster
);
}
} else if (state.supersedes(pendingState) && pendingContext.committed()) {
} else if (state.version() >= pendingState.version()) {
assert state.supersedes(pendingState) || (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is we're here, then the masters are equal. In this case supersedes == false, means version equality (because it's can't be lower. Also state.nodes().getMasterNodeId() is never null (and assigned to currentMaster.getId())) . I'm not sure what we're asserting than. Also note that it disables the next if clause, checking for uuid equality, so maybe we want to do the next if clause first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed assertion, reordered checks in c2ed5a1.

state.nodes().getMasterNodeId() != null &&
state.nodes().getMasterNodeId().equals(pendingState.nodes().getMasterNodeId()));
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
);
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
if (pendingContext.committed()) {
pendingContext.listener.onNewClusterStateProcessed();
}
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
assert pendingContext.committed() : "processed cluster state is not committed " + state;
contextsToRemove.add(pendingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
Expand All @@ -64,6 +63,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
*
Expand All @@ -82,17 +82,22 @@ public interface NewPendingClusterStateListener {
}

private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final Supplier<ClusterState> clusterStateSupplier;
private final NewPendingClusterStateListener newPendingClusterStatelistener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
private final PendingClusterStatesQueue pendingStatesQueue;

public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
public PublishClusterStateAction(
Settings settings,
TransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
NewPendingClusterStateListener listener,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that node provider interface can go away (now that's were on java8 - function refs FTW)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same but can we keep it separate from this pull request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for SURE!

DiscoverySettings discoverySettings,
ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.clusterStateSupplier = clusterStateSupplier;
this.newPendingClusterStatelistener = listener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
Expand Down Expand Up @@ -364,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
Expand Down Expand Up @@ -395,19 +400,19 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes();

if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
throw new IllegalStateException("received state from local node that does not match the current local node");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want it to read "received state with a local node that doesn't match the current one"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 7cdd647.

}

ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
final String message = String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very close to the new check. I'm not sure we need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check rejects incoming states from the same master that are out of order. I do not think that the new check covers this case. I think it's needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in. If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:

  1. A commit message for this specific CS comes in.
  2. A commit message for a higher CS comes in (including the lastSeenClusterState) and that CS is processed.

Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in.

Just to be clear, this is only if we are following a master. The line of code in question will reject older states from the same master that are out of order.

If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:

  1. A commit message for this specific CS comes in.
  2. A commit message for a higher CS comes in (including the lastSeenClusterState) and that CS is processed.
    Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above?

There is not.

Copy link
Member Author

@jasontedor jasontedor Apr 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I pushed commit 95feb40.

Locale.ROOT,
"received cluster state from current master superseded by last seen cluster state; " +
"received version [%s] with uuid [%s], last seen version [%s] with uuid [%s]",
"received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
lastSeenClusterState.version(),
Expand All @@ -416,6 +421,21 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
logger.warn(message);
throw new IllegalStateException(message);
}

final ClusterState state = clusterStateSupplier.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should strengthen ZenDiscovery.shouldIgnoreOrRejectNewClusterState to check for version equality (and assert uuid) and use it in line 410 then all of this can be change to just throwing an exception if it doesn't like it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I added commit 66cc202.

if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) {
assert !incomingState.stateUUID().equals(state.stateUUID());
final String message = String.format(
Locale.ROOT,
"received cluster state older than current cluster state; " +
"received version [%d] with uuid [%s], current version [%d]",
incomingState.version(),
incomingState.stateUUID(),
state.version()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}

protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ public void testQueueStats() {
highestCommitted = context.state;
}
}
assert highestCommitted != null;

queue.markAsProcessed(highestCommitted);
assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size()));
assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes Because uncommitted states with a lower version get cleaned now but they didn't before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh. right.

assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version())));
assertThat(queue.stats().getCommitted(), equalTo(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -161,7 +163,7 @@ public MockNode createMockNode(String name, Settings settings, Version version,
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress());
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, node, node);
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
Expand Down Expand Up @@ -233,10 +235,21 @@ protected MockTransportService buildTransportService(Settings settings, Version
return transportService;
}

protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider,
PublishClusterStateAction.NewPendingClusterStateListener listener) {
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
protected MockPublishAction buildPublishClusterStateAction(
Settings settings,
MockTransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
PublishClusterStateAction.NewPendingClusterStateListener listener
) {
DiscoverySettings discoverySettings =
new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
return new MockPublishAction(
settings,
transportService,
clusterStateSupplier,
listener,
discoverySettings,
ClusterName.DEFAULT);
}

public void testSimpleClusterStatePublishing() throws Exception {
Expand Down Expand Up @@ -598,18 +611,20 @@ public void testIncomingClusterStateValidation() throws Exception {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting"));
}

logger.info("--> test state from the current master is accepted");
node.action.validateIncomingState(ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState);
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState);


logger.info("--> testing rejection of another cluster name");
try {
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState);
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
}

logger.info("--> testing rejection of a cluster state with wrong local node");
Expand All @@ -620,6 +635,7 @@ public void testIncomingClusterStateValidation() throws Exception {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node"));
}

try {
Expand All @@ -630,6 +646,7 @@ public void testIncomingClusterStateValidation() throws Exception {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node"));
}

logger.info("--> testing acceptance of an old cluster state");
Expand All @@ -639,7 +656,7 @@ public void testIncomingClusterStateValidation() throws Exception {
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
final String message = String.format(
Locale.ROOT,
"received older cluster state version [%s] from current master with uuid [%s] than last seen cluster state [%s] from current master with uuid [%s]",
"received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
node.clusterState.version(),
Expand Down Expand Up @@ -678,19 +695,27 @@ public void testOutOfOrderCommitMessages() throws Throwable {
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();

}

logger.info("--> committing states");

long largestVersionSeen = Long.MIN_VALUE;
Randomness.shuffle(states);
for (ClusterState state : states) {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {
throw channel.error.get();
if (largestVersionSeen < state.getVersion()) {
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {
throw channel.error.get();
}
largestVersionSeen = state.getVersion();
} else {
assertNotNull(channel.error.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment as to why we expect an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 3abf817.

assertThat(channel.error.get(), instanceOf(IllegalStateException.class));
}
channel.clear();
}
channel.clear();

//now check the last state held
assertSameState(node.clusterState, finalState);
Expand Down Expand Up @@ -828,8 +853,8 @@ static class MockPublishAction extends PublishClusterStateAction {
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
AtomicBoolean errorOnCommit = new AtomicBoolean();

public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName);
public MockPublishAction(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName);
}

@Override
Expand Down