-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable acked indexing #17038
Enable acked indexing #17038
Changes from 17 commits
74194b8
57501ce
63ada98
25fae03
55465dd
51f2c3c
746ca07
4e39359
673a73d
285c3bf
7bb85e4
3353790
76465ec
4a7524f
2bd09c4
563304d
bd9e908
4793630
14ba0c3
37d739a
97be383
4e1f62e
2a93889
5576526
4de57fc
85d3d51
0e5b22a
c4324f9
649bcdc
27448dc
1f12bee
8b970d9
c7c8b1d
e201f5c
cffc315
7cdd647
3abf817
c2ed5a1
66cc202
95feb40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,7 @@ | |
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
@@ -403,7 +404,7 @@ protected void responseWithFailure(Throwable t) { | |
protected void doRun() throws Exception { | ||
setPhase(task, "replica"); | ||
assert request.shardId() != null : "request shardId must be set"; | ||
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) { | ||
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request)) { | ||
shardOperationOnReplica(request); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request); | ||
|
@@ -706,7 +707,7 @@ protected void doRun() throws Exception { | |
return; | ||
} | ||
// closed in finishAsFailed(e) in the case of error | ||
indexShardReference = getIndexShardReferenceOnPrimary(shardId); | ||
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request); | ||
if (indexShardReference.isRelocated() == false) { | ||
executeLocally(); | ||
|
||
|
@@ -820,24 +821,64 @@ void finishBecauseUnavailable(ShardId shardId, String message) { | |
} | ||
} | ||
|
||
|
||
static ConcurrentMap<IndexShardReference, String> openShardReferences; | ||
|
||
static boolean setupShardReferenceAssertions() { | ||
openShardReferences = new ConcurrentHashMap<>(); | ||
return true; | ||
} | ||
|
||
static boolean addShardReference(IndexShardReference ref, String desc) { | ||
String prev = openShardReferences.put(ref, desc); | ||
if (prev != null) { | ||
throw new AssertionError("shard ref " + ref + " is added twice. current [" + desc + "] prev [" + prev + "]"); | ||
} | ||
return true; | ||
} | ||
|
||
static boolean removeShardReference(IndexShardReference ref) { | ||
assert openShardReferences.remove(ref) != null : "failed to find ref [" + ref + "]"; | ||
return true; | ||
} | ||
|
||
static { | ||
assert setupShardReferenceAssertions(); | ||
} | ||
|
||
static public void assertAllShardReferencesAreCleaned() { | ||
if (openShardReferences == null || openShardReferences.isEmpty()) { | ||
return; | ||
} | ||
StringBuilder sb = new StringBuilder(); | ||
for (String desc : openShardReferences.values()) { | ||
sb.append(desc).append("\n"); | ||
} | ||
assert sb.length() == 0 : "Found unclosed shard references:\n" + sb; | ||
} | ||
|
||
/** | ||
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally | ||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}). | ||
*/ | ||
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { | ||
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { | ||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); | ||
IndexShard indexShard = indexService.getShard(shardId.id()); | ||
return new IndexShardReferenceImpl(indexShard, true); | ||
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, true); | ||
assert addShardReference(ref, "primary: " + request.toString() + " " + ref.routingEntry()); | ||
return ref; | ||
} | ||
|
||
/** | ||
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as | ||
* replication is completed on the node. | ||
*/ | ||
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { | ||
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, ReplicaRequest request) { | ||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); | ||
IndexShard indexShard = indexService.getShard(shardId.id()); | ||
return new IndexShardReferenceImpl(indexShard, false); | ||
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, false); | ||
assert addShardReference(ref, "replica: " + request.toString() + " " + ref.routingEntry()); | ||
return ref; | ||
} | ||
|
||
/** | ||
|
@@ -1018,30 +1059,36 @@ public void handleException(TransportException exp) { | |
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); | ||
logger.warn("[{}] {}", exp, shardId, message); | ||
shardStateAction.shardFailed( | ||
shard, | ||
indexShardReference.routingEntry(), | ||
message, | ||
exp, | ||
new ShardStateAction.Listener() { | ||
@Override | ||
public void onSuccess() { | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable shardFailedError) { | ||
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { | ||
ShardRouting primaryShard = indexShardReference.routingEntry(); | ||
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); | ||
// we are no longer the primary, fail ourselves and start over | ||
indexShardReference.failShard(message, shardFailedError); | ||
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); | ||
} else { | ||
assert false : shardFailedError; | ||
shard, | ||
indexShardReference.routingEntry(), | ||
message, | ||
exp, | ||
new ShardStateAction.Listener() { | ||
@Override | ||
public void onSuccess() { | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable shardFailedError) { | ||
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { | ||
String message = "unknown"; | ||
try { | ||
ShardRouting primaryShard = indexShardReference.routingEntry(); | ||
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); | ||
// we are no longer the primary, fail ourselves and start over | ||
indexShardReference.failShard(message, shardFailedError); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we open a follow up issue that failShard should throw an already closed exception? (this is really what the code protects against). To be clear - I think the code can stay. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened #17366. |
||
} catch (Throwable t) { | ||
shardFailedError.addSuppressed(t); | ||
} | ||
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); | ||
} else { | ||
assert shardFailedError instanceof TransportException || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment about where these exceptions can come from? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pushed 4e1f62e. |
||
shardFailedError instanceof NodeClosedException : shardFailedError; | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
} | ||
} | ||
} | ||
); | ||
} | ||
} | ||
|
@@ -1125,7 +1172,9 @@ protected boolean shouldExecuteReplication(Settings settings) { | |
|
||
interface IndexShardReference extends Releasable { | ||
boolean isRelocated(); | ||
|
||
void failShard(String reason, @Nullable Throwable e); | ||
|
||
ShardRouting routingEntry(); | ||
} | ||
|
||
|
@@ -1146,6 +1195,7 @@ static final class IndexShardReferenceImpl implements IndexShardReference { | |
@Override | ||
public void close() { | ||
operationLock.close(); | ||
assert removeShardReference(this); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -223,9 +223,10 @@ protected void doStop() { | |
FutureUtils.cancel(this.reconnectToNodes); | ||
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) { | ||
onGoingTimeout.cancel(); | ||
onGoingTimeout.listener.onClose(); | ||
} | ||
ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS); | ||
postAppliedListeners.stream().filter(listener -> listener instanceof TimeoutClusterStateListener) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment why we do it out of loop? (i.e. loop again) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this was lost, but I think is a good thing to have. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 8b970d9. |
||
.forEach(listener -> ((TimeoutClusterStateListener) listener).onClose()); | ||
remove(localNodeMasterListeners); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ | |
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.CountDownLatch; | ||
|
@@ -402,6 +403,18 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus | |
} | ||
|
||
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); | ||
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) { | ||
final String message = String.format( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very close to the new check. I'm not sure we need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check rejects incoming states from the same master that are out of order. I do not think that the new check covers this case. I think it's needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in. If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:
Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Just to be clear, this is only if we are following a master. The line of code in question will reject older states from the same master that are out of order.
There is not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
Locale.ROOT, | ||
"received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add that it came from the current master? Also - thinking about this more - this can only happen in our testing right? In practice we only have 1 channel (if someone doesn't mess with some settings). Maybe add a comment about it if so... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pushed 85d3d51. |
||
incomingState.version(), | ||
incomingState.stateUUID(), | ||
lastSeenClusterState.version(), | ||
lastSeenClusterState.stateUUID() | ||
); | ||
logger.warn(message); | ||
throw new IllegalStateException(message); | ||
} | ||
} | ||
|
||
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -759,7 +759,13 @@ public void releaseSnapshot(IndexCommit snapshot) throws IOException { | |
*/ | ||
public void failShard(String reason, @Nullable Throwable e) { | ||
// fail the engine. This will cause this shard to also be removed from the node's index service. | ||
getEngine().failEngine(reason, e); | ||
final Engine engine = getEngineOrNull(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haha. I fixed this inline - forgot :) I think this should be a separate PR - it's not really needed for this one (we protect for it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted in 5576526. |
||
if (engine == null) { | ||
logger.trace("ignoring request to fail the shard, we're already closed. (reason: [{}])", e, reason); | ||
|
||
} else { | ||
engine.failEngine(reason, e); | ||
} | ||
} | ||
|
||
public Engine.Searcher acquireSearcher(String source) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels way more natural on the IndexShard level - I felt that way when I made it but couldn't find a way (at the time) to push it to IndexShard while getting the debugging information to be used as the string value here. We can pass it to IndexShard.acquirePrimaryOperationLock and acquireReplicaOperationLock but that will force a toString on the request object. How about making the reason an Object type and only call toString on it in IndexShard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or even better a reason supplier called on demand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's more natural there, but I prefer to keep it where it is until there's a clear need for it at the lower level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guys we can't have static maps on classes like this. If we want some kind of static assertions we should inject some mock services just as we do for for
SearchContext
inMockSearchService
. I am a bit confused why we have a static factory method on literally the only impl of an interface inIndexShardReferenceImpl
this smells like a design flaw to me. I think with this change together we should rather expose a pluggalbe service or move the creation intoIndexService
and let folks plug in a factory intoIndexModule
in order to impl this assertion? but static per JVM concurrent maps seems like a playing russian rouletteThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 0e5b22a.