Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Fix wrong error upper bound when performing incremental reductions (#43874) #76475

Merged
merged 2 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
true,
0,
buckets,
0
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private StringTerms newTerms(boolean withNested) {
false,
100000,
resultBuckets,
0
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(Float.NaN, searchResponse.getHits().getMaxScore(), 0f);
assertEquals(1, searchResponse.getAggregations().asList().size());
Terms terms = searchResponse.getAggregations().get("top-tags");
assertEquals(0, terms.getDocCountError());
assertEquals(0, terms.getDocCountError().longValue());
assertEquals(0, terms.getSumOfOtherDocCounts());
assertEquals(3, terms.getBuckets().size());
for (Terms.Bucket bucket : terms.getBuckets()) {
Expand All @@ -589,7 +589,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(2, children.getDocCount());
assertEquals(1, children.getAggregations().asList().size());
Terms leafTerms = children.getAggregations().get("top-names");
assertEquals(0, leafTerms.getDocCountError());
assertEquals(0, leafTerms.getDocCountError().longValue());
assertEquals(0, leafTerms.getSumOfOtherDocCounts());
assertEquals(2, leafTerms.getBuckets().size());
assertEquals(2, leafTerms.getBuckets().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,8 +90,6 @@ public void setupSuiteScopeCluster() throws Exception {
.field(DOUBLE_FIELD_NAME, 1.0 * randomInt(numUniqueTerms))
.endObject()));
}
assertAcked(prepareCreate("idx_fixed_docs_0").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard0DocsPerTerm = new HashMap<>();
shard0DocsPerTerm.put("A", 25);
shard0DocsPerTerm.put("B", 18);
Expand All @@ -102,16 +101,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard0DocsPerTerm.put("H", 2);
shard0DocsPerTerm.put("I", 1);
shard0DocsPerTerm.put("J", 1);
for (Map.Entry<String, Integer> entry : shard0DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_0", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).endObject()));
}
}
buildIndex(shard0DocsPerTerm, "idx_fixed_docs_0", 0, builders);

assertAcked(prepareCreate("idx_fixed_docs_1").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard1DocsPerTerm = new HashMap<>();
shard1DocsPerTerm.put("A", 30);
shard1DocsPerTerm.put("B", 25);
Expand All @@ -123,17 +114,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard1DocsPerTerm.put("Q", 6);
shard1DocsPerTerm.put("J", 8);
shard1DocsPerTerm.put("C", 4);
for (Map.Entry<String, Integer> entry : shard1DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_1", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 1).endObject()));
}
}
buildIndex(shard1DocsPerTerm, "idx_fixed_docs_1", 1, builders);

assertAcked(prepareCreate("idx_fixed_docs_2")
.addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard2DocsPerTerm = new HashMap<>();
shard2DocsPerTerm.put("A", 45);
shard2DocsPerTerm.put("C", 44);
Expand All @@ -143,16 +125,46 @@ public void setupSuiteScopeCluster() throws Exception {
shard2DocsPerTerm.put("H", 28);
shard2DocsPerTerm.put("Q", 2);
shard2DocsPerTerm.put("D", 1);
for (Map.Entry<String, Integer> entry : shard2DocsPerTerm.entrySet()) {
buildIndex(shard2DocsPerTerm, "idx_fixed_docs_2", 2, builders);

Map<String, Integer> shard3DocsPerTerm = new HashMap<>();
shard3DocsPerTerm.put("A", 1);
shard3DocsPerTerm.put("B", 1);
shard3DocsPerTerm.put("C", 1);
buildIndex(shard3DocsPerTerm, "idx_fixed_docs_3", 3, builders);

Map<String, Integer> shard4DocsPerTerm = new HashMap<>();
shard4DocsPerTerm.put("K", 1);
shard4DocsPerTerm.put("L", 1);
shard4DocsPerTerm.put("M", 1);
buildIndex(shard4DocsPerTerm, "idx_fixed_docs_4", 4, builders);

Map<String, Integer> shard5DocsPerTerm = new HashMap<>();
shard5DocsPerTerm.put("X", 1);
shard5DocsPerTerm.put("Y", 1);
shard5DocsPerTerm.put("Z", 1);
buildIndex(shard5DocsPerTerm, "idx_fixed_docs_5", 5, builders);

indexRandom(true, builders);
ensureSearchable();
}

private void buildIndex(Map<String, Integer> docsPerTerm, String index, int shard, List<IndexRequestBuilder> builders)
throws IOException {
assertAcked(
prepareCreate(index).addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
);
for (Map.Entry<String, Integer> entry : docsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_2", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 2).endObject()));
builders.add(
client().prepareIndex(index, "type")
.setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", shard).endObject())
);
}
}

indexRandom(true, builders);
ensureSearchable();
}

private void assertDocCountErrorWithinBounds(int size, SearchResponse accurateResponse, SearchResponse testResponse) {
Expand Down Expand Up @@ -1015,4 +1027,21 @@ public void testFixedDocs() throws Exception {
assertThat(bucket.getDocCountError(), equalTo(29L));
}

/**
* Tests the upper bounds are correct when performing incremental reductions
* See https://github.com/elastic/elasticsearch/issues/40005 for more details
*/
public void testIncrementalReduction() {
SearchResponse response = client().prepareSearch("idx_fixed_docs_3", "idx_fixed_docs_4", "idx_fixed_docs_5")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field(STRING_FIELD_NAME)
.showTermDocCountError(true)
.size(5).shardSize(5)
.collectMode(randomFrom(SubAggCollectionMode.values())))
.get();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms.getDocCountError(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr

protected abstract long getSumOfOtherDocCounts();

protected abstract long getDocCountError();
protected abstract Long getDocCountError();

protected abstract void setDocCountError(long docCountError);

Expand Down Expand Up @@ -133,7 +133,7 @@ private long getDocCountError(A terms) {
if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.getOrder())) {
return 0;
} else if (InternalOrder.isCountDesc(terms.getOrder())) {
if (terms.getDocCountError() > 0) {
if (terms.getDocCountError() != null) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
return terms.getDocCountError();
Expand Down Expand Up @@ -340,7 +340,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Intern

protected static XContentBuilder doXContentCommon(XContentBuilder builder,
Params params,
long docCountError,
Long docCountError,
long otherDocCount,
List<? extends AbstractTermsBucket> buckets) throws IOException {
builder.field(DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME.getPreferredName(), docCountError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {

protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0L);
}

protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public int hashCode() {

public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.search.aggregations.bucket.terms;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -32,11 +33,11 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
protected final List<B> buckets;
protected Map<String, B> bucketMap;

protected long docCountError;
protected Long docCountError;

protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize,
boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
boolean showTermDocCountError, long otherDocCount, List<B> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
this.format = format;
this.shardSize = shardSize;
Expand All @@ -51,7 +52,15 @@ protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder
*/
protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
super(in);
docCountError = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_15_0)) {
if (in.readBoolean()) {
docCountError = in.readZLong();
} else {
docCountError = null;
}
} else {
docCountError = in.readZLong();
}
format = in.readNamedWriteable(DocValueFormat.class);
shardSize = readSize(in);
showTermDocCountError = in.readBoolean();
Expand All @@ -61,7 +70,16 @@ protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) thr

@Override
protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
out.writeZLong(docCountError);
if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
if (docCountError != null) {
out.writeBoolean(true);
out.writeZLong(docCountError);
} else {
out.writeBoolean(false);
}
} else {
out.writeZLong(docCountError == null ? 0 : docCountError);
}
out.writeNamedWriteable(format);
writeSize(shardSize, out);
out.writeBoolean(showTermDocCountError);
Expand All @@ -80,7 +98,7 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountError;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int hashCode() {

public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
null
);
}

Expand All @@ -390,7 +390,7 @@ LongTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
null
);
}

Expand All @@ -472,7 +472,7 @@ DoubleTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class ParsedTerms extends ParsedMultiBucketAggregation<ParsedTer
protected long sumOtherDocCount;

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountErrorUpperBound;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public int hashCode() {

public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
showTermDocCountError,
otherDocsCount,
buckets,
0
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interface Bucket extends MultiBucketsAggregation.Bucket {
/**
* Get an upper bound of the error on document counts in this aggregation.
*/
long getDocCountError();
Long getDocCountError();

/**
* Return the sum of the document counts of all buckets that did not make
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean isMapped() {

@Override
public final XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return doXContentCommon(builder, params, 0, 0, Collections.emptyList());
return doXContentCommon(builder, params, 0L, 0, Collections.emptyList());
}

@Override
Expand All @@ -110,8 +110,8 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
return 0;
public Long getDocCountError() {
return 0L;
}

@Override
Expand Down
Loading