Skip to content

Commit

Permalink
Add Aggregation Execution Context (elastic#85011)
Browse files Browse the repository at this point in the history
Adds a place to store information during aggregation execution and use this
context to store the current tsid. It allows us to achieve 3x improvement in
the timeseries aggregation execution speed. In a follow up PR, I would like
to remove the inheritance of BucketCollector from Collector and instead try
wrapping it into a collector when needed. This should prevent us form using
getLeafCollector(LeafReaderContext ctx) method in a wrong context in future.

Relates to elastic#74660
  • Loading branch information
imotov committed Mar 31, 2022
1 parent 83152b6 commit 736ce7e
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
Expand Down Expand Up @@ -694,7 +695,7 @@ public InternalAggregation buildEmptyAggregation() {
public void close() {}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;
Expand Down Expand Up @@ -77,8 +76,8 @@ public final Aggregator subAggregator(String name) {
}

@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return delegate.getLeafCollector(ctx);
public final LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
return delegate.getLeafCollector(aggCtx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedSupplier;

import java.io.IOException;

/**
* Used to preserve contextual information during aggregation execution. It can be used by search executors and parent
* aggregations to provide contextual information for the child aggregation during execution such as the currently executed
* time series id or the size of the current date histogram bucket. The information provided by this class is highly contextual and
* only valid during the {@link LeafBucketCollector#collect} call.
*/
public class AggregationExecutionContext {

private final CheckedSupplier<BytesRef, IOException> tsidProvider;
private final LeafReaderContext leafReaderContext;

public AggregationExecutionContext(LeafReaderContext leafReaderContext, CheckedSupplier<BytesRef, IOException> tsidProvider) {
this.leafReaderContext = leafReaderContext;
this.tsidProvider = tsidProvider;
}

public LeafReaderContext getLeafReaderContext() {
return leafReaderContext;
}

public BytesRef getTsid() throws IOException {
return tsidProvider != null ? tsidProvider.get() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void badState() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext reader) {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) {
badState();
assert false;
return null; // unreachable but compiler does not agree
Expand Down Expand Up @@ -201,6 +201,12 @@ public Map<String, Object> metadata() {
*/
protected abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;

// TODO: Remove this method in refactoring
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, AggregationExecutionContext aggCtx)
throws IOException {
return getLeafCollector(ctx, sub);
}

/**
* Collect results for this leaf.
* <p>
Expand All @@ -210,10 +216,10 @@ public Map<String, Object> metadata() {
* for more details on what this does.
*/
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
preGetSubLeafCollectors(ctx);
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
return getLeafCollector(ctx, sub);
public final LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
preGetSubLeafCollectors(aggCtx.getLeafReaderContext());
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(aggCtx);
return getLeafCollector(aggCtx.getLeafReaderContext(), sub, aggCtx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class BucketCollector implements Collector {
public static final BucketCollector NO_OP_COLLECTOR = new BucketCollector() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext reader) {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

Expand All @@ -36,14 +36,18 @@ public void postCollection() throws IOException {
// no-op
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
};

// TODO: will remove it in a follow up PR
@Override
public abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException;
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return getLeafCollector(new AggregationExecutionContext(ctx, null));
}

public abstract LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException;

/**
* Pre collection callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
Expand Down Expand Up @@ -89,9 +88,9 @@ public void postCollection() throws IOException {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
try {
LeafBucketCollector leafCollector = collector.getLeafCollector(ctx);
LeafBucketCollector leafCollector = collector.getLeafCollector(aggCtx);
if (false == leafCollector.isNoop()) {
return leafCollector;
}
Expand Down Expand Up @@ -169,11 +168,11 @@ public String toString() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
final List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
for (BucketCollector collector : collectors) {
try {
LeafBucketCollector leafCollector = collector.getLeafCollector(context);
LeafBucketCollector leafCollector = collector.getLeafCollector(aggCtx);
if (false == leafCollector.isNoop()) {
leafCollectors.add(leafCollector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -110,7 +111,7 @@ private void clearLeaf() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
finishLeaf();

return new LeafBucketCollector() {
Expand All @@ -119,7 +120,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOExce
@Override
public void collect(int doc, long bucket) throws IOException {
if (context == null) {
context = ctx;
context = aggCtx.getLeafReaderContext();
docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
throw new IllegalStateException(
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
Expand Down Expand Up @@ -77,8 +78,8 @@ public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
perSegCollector = new PerSegmentCollects(ctx);
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
perSegCollector = new PerSegmentCollects(aggCtx.getLeafReaderContext());
entries.add(perSegCollector);

// Deferring collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.plain.SortedSetBytesLeafFieldData;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
Expand All @@ -29,11 +27,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class TimeSeriesAggregator extends BucketsAggregator {

private final IndexFieldData<SortedSetBytesLeafFieldData> tsidFieldData;
protected final BytesKeyedBucketOrds bucketOrds;
private final boolean keyed;

Expand All @@ -49,10 +45,6 @@ public TimeSeriesAggregator(
) throws IOException {
super(name, factories, context, parent, bucketCardinality, metadata);
this.keyed = keyed;
tsidFieldData = (IndexFieldData<SortedSetBytesLeafFieldData>) Objects.requireNonNull(
context.buildFieldContext("_tsid"),
"Cannot obtain tsid field"
).indexFieldData();
bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), bucketCardinality);
}

Expand Down Expand Up @@ -98,20 +90,22 @@ protected void doClose() {

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector sub) throws IOException {
final SortedBinaryDocValues tsids = tsidFieldData.load(context).getBytesValues();
// TODO: remove this method in a follow up PR
throw new UnsupportedOperationException("Shouldn't be here");
}

protected LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector sub, AggregationExecutionContext aggCtx)
throws IOException {
return new LeafBucketCollectorBase(sub, null) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (tsids.advanceExact(doc)) {
BytesRef newTsid = tsids.nextValue();
long bucketOrdinal = bucketOrds.add(bucket, newTsid);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
long bucketOrdinal = bucketOrds.add(bucket, aggCtx.getTsid());
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
Expand All @@ -24,6 +23,7 @@
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;

Expand Down Expand Up @@ -60,13 +60,18 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) {
checkCancelled();
}
LeafBucketCollector leafCollector = bucketCollector.getLeafCollector(leaf);
Scorer scorer = weight.scorer(leaf);
if (scorer != null) {
LeafWalker leafWalker = new LeafWalker(leaf, scorer, leafCollector);
LeafWalker leafWalker = new LeafWalker(leaf, scorer, bucketCollector, leaf);
if (leafWalker.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
leafWalkers.add(leafWalker);
}
} else {
// Even though we will not walk through this aggregation as a part of normal processing
// this is needed to trigger actions in some bucketCollectors that bypass the normal iteration logic
// for example, global aggregator triggers a separate iterator that ignores the query but still needs
// to know all leaves
bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null));
}
}

Expand Down Expand Up @@ -148,7 +153,7 @@ private void checkCancelled() {
}

private static class LeafWalker {
private final LeafCollector collector;
private final LeafBucketCollector collector;
private final Bits liveDocs;
private final DocIdSetIterator iterator;
private final SortedDocValues tsids;
Expand All @@ -158,8 +163,9 @@ private static class LeafWalker {
int tsidOrd;
long timestamp;

LeafWalker(LeafReaderContext context, Scorer scorer, LeafCollector collector) throws IOException {
this.collector = collector;
LeafWalker(LeafReaderContext context, Scorer scorer, BucketCollector bucketCollector, LeafReaderContext leaf) throws IOException {
AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get);
this.collector = bucketCollector.getLeafCollector(aggCtx);
liveDocs = context.reader().getLiveDocs();
this.collector.setScorer(scorer);
iterator = scorer.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.search.profile.aggregation;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -87,11 +87,11 @@ public InternalAggregation buildEmptyAggregation() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_LEAF_COLLECTOR);
timer.start();
try {
return new ProfilingLeafBucketCollector(delegate.getLeafCollector(ctx), profileBreakdown);
return new ProfilingLeafBucketCollector(delegate.getLeafCollector(aggCtx), profileBreakdown);
} finally {
timer.stop();
}
Expand Down
Loading

0 comments on commit 736ce7e

Please sign in to comment.