Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Search: correct shards counting #55758

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, Interna

@Override
public void onResponse(SearchResponse response) {
searchResponse.get().updateFinalResponse(response.getSuccessfulShards(), response.getInternalResponse());
searchResponse.get().updateFinalResponse(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, #55683 has the same change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was working on mine I was wondering if I should assert that the final response looks "right" based on our updates. I never did that, but maybe it is good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by "right" here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure! That was part of why I didn't do it....

I was thinking that it might be useful to know if the response that we got here "agrees" with the results we got from the listener. Maybe that is just checking counts or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've already added the assertions below. Ignore me!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yea that is the reason why I added the whole response as argument ;)

executeCompletionListeners();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ synchronized void updatePartialResponse(int successfulShards, SearchResponseSect
throw new IllegalStateException("received partial response out of order: "
+ newSections.getNumReducePhases() + " < " + sections.getNumReducePhases());
}
this.successfulShards = successfulShards;
//when we get partial results skipped shards are not included in the provided number of successful shards
this.successfulShards = successfulShards + skippedShards;
this.sections = newSections;
this.isPartial = true;
this.isFinalReduce = isFinalReduce;
Expand All @@ -101,12 +102,20 @@ synchronized void updatePartialResponse(int successfulShards, SearchResponseSect
* Updates the response with the final {@link SearchResponseSections} merged from #<code>successfulShards</code>
* shards.
*/
synchronized void updateFinalResponse(int successfulShards, SearchResponseSections newSections) {
synchronized void updateFinalResponse(SearchResponse searchResponse) {
failIfFrozen();
assert searchResponse.getTotalShards() == totalShards : "received number of total shards differs from the one " +
"notified through onListShards";
assert searchResponse.getSkippedShards() == skippedShards : "received number of skipped shards differs from the one " +
"notified through onListShards";
assert searchResponse.getFailedShards() == buildShardFailures().length : "number of tracked failures differs from failed shards";
// copy the response headers from the current context
this.responseHeaders = threadContext.getResponseHeaders();
this.successfulShards = successfulShards;
this.sections = newSections;
//we take successful from the final response, which overrides whatever value we set when we received the last partial results.
//This is important for cases where e.g. aggs work fine and then fetch fails on some of the shards but not all.
//The shards where fetch has failed should not counted as successful.
this.successfulShards = searchResponse.getSuccessfulShards();
this.sections = searchResponse.getInternalResponse();
this.isPartial = false;
this.isFinalReduce = true;
this.frozen = true;
Expand All @@ -120,6 +129,10 @@ synchronized void updateWithFailure(Exception exc) {
failIfFrozen();
// copy the response headers from the current context
this.responseHeaders = threadContext.getResponseHeaders();
//We may have already received some partial results, in which case the number of successful shards reflects that despite the search
//has failed entirely at a later stage. We should consider all shards as failed given that none of them was able to e.g. fetch
//skipped shards are considered successful though
this.successfulShards = this.skippedShards;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this makes what gets returned more consistent with what _search does, though we lose information on how many shards the partial results come from. We possibly need to expand the info that we return if we want to better represent this scenario where there are partial results yet the whole search has failed hence stopped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to update these informations ? They are not used anymore when the response is final. I think #55683 has the more straightforward approach of keeping the final response as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is the reason why you can end up with situations like successful: 3 failed: 3 total: 3 .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather want successful: 0 failed: 3 total:3 ( I left skipped out for simplicity)

I have yet to look at the linked PR, will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I am now up-to-date, I agree with you on updateFinalResponse, no need to touch successful shards anymore, but you commented on updateWithFailure which still has the problem of returning a shards header that search does not return? Hence shall we signal how many shards have returned partial results although all of them later failed, or shall we just zero the successful shards that makes things more consistent with search? Otherwise I think we would need to add more details on which phase has failed to the response to be more accurate... I will update and merge conflicts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll solve this discrepancy when wee add partial top hits in the response. In the meantime I don't think we should reset the successful shards, it would be weird to have successful: 0 (assuming no shards were skipped) but some partial results in the response ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's weird either way. I cant make up my mind on which way is less weird :) I will revert this bit then and maybe add a comment that explains what happens today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe one thing we could rather do in this case is to reset shard failures, as they have caused a fatal failure and don't need to be returned as part of the search response too, they are already in the outer error section. That way we would have failed: 0 in the inner search response, which makes more sense as it's a snapshot of the results before the failure happened. Not too sure though if this may end up causing problems in other scenarios. Doing nothing is also fine with me, as long as we are aware that the shards section can be weird at times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about ignoring fetch shard failures when building partial responses ?
We never return partial top hits if the response is partial so the successful and failure counts should only reflect the query phase ? This will change when we add the support for partial top hits but that's the least confusing solution I can think of.

this.isPartial = true;
this.failure = ElasticsearchException.guessRootCauses(exc)[0];
this.frozen = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static SearchResponse randomSearchResponse() {
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
int skippedShards = randomIntBetween(0, successfulShards);
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
return new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,26 +134,24 @@ public void testWaitForCompletion() throws InterruptedException {
for (int i = 0; i < numSkippedShards; i++) {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}

int numShardFailures = 0;
int totalShards = numShards + numSkippedShards;
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
for (int i = 0; i < numShards; i++) {
task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1),
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, numShardFailures, true);
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true);
}
task.getSearchProgressActionListener().onFinalReduce(shards,
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, numShardFailures, true);
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true);
((AsyncSearchTask.Listener)task.getProgressListener()).onResponse(
newSearchResponse(numShards+numSkippedShards, numShards, numSkippedShards));
assertCompletionListeners(task, numShards+numSkippedShards,
numSkippedShards, numShardFailures, false);
newSearchResponse(totalShards, totalShards, numSkippedShards));
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, false);
}

public void testWithFetchFailures() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
int numShards = randomIntBetween(0, 10);
int numShards = randomIntBetween(2, 10);
List<SearchShard> shards = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
shards.add(new SearchShard(null, new ShardId("0", "0", 1)));
Expand All @@ -163,38 +161,72 @@ public void testWithFetchFailures() throws InterruptedException {
for (int i = 0; i < numSkippedShards; i++) {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}

int totalShards = numShards + numSkippedShards;
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
for (int i = 0; i < numShards; i++) {
task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1),
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, 0, true);
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true);
}
task.getSearchProgressActionListener().onFinalReduce(shards,
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
int numFetchFailures = randomIntBetween(0, numShards);
int numFetchFailures = randomIntBetween(1, numShards - 1);
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFetchFailures];
for (int i = 0; i < numFetchFailures; i++) {
IOException failure = new IOException("boum");
task.getSearchProgressActionListener().onFetchFailure(i,
new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE),
new IOException("boum"));

failure);
shardSearchFailures[i] = new ShardSearchFailure(failure);
}
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, numFetchFailures, true);
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numFetchFailures, true);
((AsyncSearchTask.Listener)task.getProgressListener()).onResponse(
newSearchResponse(numShards+numSkippedShards, numShards, numSkippedShards));
assertCompletionListeners(task, numShards+numSkippedShards,
numSkippedShards, numFetchFailures, false);
newSearchResponse(totalShards, totalShards - numFetchFailures, numSkippedShards, shardSearchFailures));
assertCompletionListeners(task, totalShards, totalShards - numFetchFailures, numSkippedShards, numFetchFailures, false);
}

public void testFatalFailureDuringFetch() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
shards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
List<SearchShard> skippedShards = new ArrayList<>();
int numSkippedShards = randomIntBetween(0, 10);
for (int i = 0; i < numSkippedShards; i++) {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
int totalShards = numShards + numSkippedShards;
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
for (int i = 0; i < numShards; i++) {
task.getSearchProgressActionListener().onPartialReduce(shards.subList(0, i+1),
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, totalShards, i + 1 + numSkippedShards, numSkippedShards, 0, true);
}
task.getSearchProgressActionListener().onFinalReduce(shards,
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
for (int i = 0; i < numShards; i++) {
task.getSearchProgressActionListener().onFetchFailure(i,
new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE),
new IOException("boum"));
}
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true);
((AsyncSearchTask.Listener)task.getProgressListener()).onFailure(new IOException("boum"));
assertCompletionListeners(task, totalShards, numSkippedShards, numSkippedShards, numShards, true);
}

private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards) {
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards,
ShardSearchFailure... shardFailures) {
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),
InternalAggregations.EMPTY, null, null, false, null, 1);
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards,
100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
100, shardFailures, SearchResponse.Clusters.EMPTY);
}

private void assertCompletionListeners(AsyncSearchTask task,
int expectedTotalShards,
int expectedSuccessfulShards,
int expectedSkippedShards,
int expectedShardFailures,
boolean isPartial) throws InterruptedException {
Expand All @@ -205,6 +237,7 @@ private void assertCompletionListeners(AsyncSearchTask task,
@Override
public void onResponse(AsyncSearchResponse resp) {
assertThat(resp.getSearchResponse().getTotalShards(), equalTo(expectedTotalShards));
assertThat(resp.getSearchResponse().getSuccessfulShards(), equalTo(expectedSuccessfulShards));
assertThat(resp.getSearchResponse().getSkippedShards(), equalTo(expectedSkippedShards));
assertThat(resp.getSearchResponse().getFailedShards(), equalTo(expectedShardFailures));
assertThat(resp.isPartial(), equalTo(isPartial));
Expand Down