Skip to content

Commit

Permalink
tidy up group by engines after removal of v1 (#15665)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis authored Jan 11, 2024
1 parent 87fbe42 commit 2118258
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
Expand All @@ -57,18 +63,29 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngine;
import org.apache.druid.query.groupby.epinephelinae.GroupByResultMergeFn;
import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.LimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.filter.AllNullColumnSelectorFactory;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -84,6 +101,9 @@
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

/**
* Common code for processing {@link GroupByQuery}.
*/
public class GroupingEngine
{
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
Expand Down Expand Up @@ -174,7 +194,7 @@ public Comparator<ResultRow> createResultComparator(Query<ResultRow> queryParam)
*/
public BinaryOperator<ResultRow> createMergeFn(Query<ResultRow> queryParam)
{
return new GroupByBinaryFnV2((GroupByQuery) queryParam);
return new GroupByResultMergeFn((GroupByQuery) queryParam);
}

public GroupByQuery prepareGroupByQuery(GroupByQuery query)
Expand Down Expand Up @@ -398,7 +418,7 @@ public QueryRunner<ResultRow> mergeRunners(
final Iterable<QueryRunner<ResultRow>> queryRunners
)
{
return new GroupByMergingQueryRunnerV2(
return new GroupByMergingQueryRunner(
configSupplier.get(),
processingConfig,
queryProcessingPool,
Expand Down Expand Up @@ -430,14 +450,71 @@ public Sequence<ResultRow> process(
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
return GroupByQueryEngineV2.process(
query,
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query),
processingConfig,
groupByQueryMetrics
);
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);

if (storageAdapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}

final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}

final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();

try {
final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded(
query.context().getString(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP)
);

final DateTime fudgeTimestamp = fudgeTimestampString == null
? null
: DateTimes.utc(Long.parseLong(fudgeTimestampString));

final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());

final boolean doVectorize = query.context().getVectorize().shouldVectorize(
VectorGroupByEngine.canVectorize(query, storageAdapter, filter)
);

final Sequence<ResultRow> result;

if (doVectorize) {
result = VectorGroupByEngine.process(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
filter,
interval,
querySpecificConfig,
processingConfig,
groupByQueryMetrics
);
} else {
result = GroupByQueryEngine.process(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
querySpecificConfig,
processingConfig,
filter,
interval,
groupByQueryMetrics
);
}

return result.withBaggage(bufferHolder);
}
catch (Throwable e) {
bufferHolder.close();
throw e;
}
}

/**
Expand Down Expand Up @@ -743,6 +820,89 @@ private Set<String> getAggregatorAndPostAggregatorNames(GroupByQuery query)
return aggsAndPostAggs;
}


/**
* Returns the cardinality of array needed to do array-based aggregation, or -1 if array-based aggregation
* is impossible.
*/
public static int getCardinalityForArrayAggregation(
GroupByQueryConfig querySpecificConfig,
GroupByQuery query,
StorageAdapter storageAdapter,
ByteBuffer buffer
)
{
if (querySpecificConfig.isForceHashAggregation()) {
return -1;
}

final List<DimensionSpec> dimensions = query.getDimensions();
final ColumnCapabilities columnCapabilities;
final int cardinality;

// Find cardinality
if (dimensions.isEmpty()) {
columnCapabilities = null;
cardinality = 1;
} else if (dimensions.size() == 1) {
// Only real columns can use array-based aggregation, since virtual columns cannot currently report their
// cardinality. We need to check if a virtual column exists with the same name, since virtual columns can shadow
// real columns, and we might miss that since we're going directly to the StorageAdapter (which only knows about
// real columns).
if (query.getVirtualColumns().exists(Iterables.getOnlyElement(dimensions).getDimension())) {
return -1;
}
// We cannot support array-based aggregation on array based grouping as we we donot have all the indexes up front
// to allocate appropriate values
if (dimensions.get(0).getOutputType().isArray()) {
return -1;
}

final String columnName = Iterables.getOnlyElement(dimensions).getDimension();
columnCapabilities = storageAdapter.getColumnCapabilities(columnName);
cardinality = storageAdapter.getDimensionCardinality(columnName);
} else {
// Cannot use array-based aggregation with more than one dimension.
return -1;
}

// Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality
if (Types.is(columnCapabilities, ValueType.STRING) && cardinality > 0) {
final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs().toArray(new AggregatorFactory[0]);
final long requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity(
cardinality,
aggregatorFactories
);

// Check that all keys and aggregated values can be contained in the buffer
if (requiredBufferCapacity < 0 || requiredBufferCapacity > buffer.capacity()) {
return -1;
} else {
return cardinality;
}
} else {
return -1;
}
}

public static void convertRowTypesToOutputTypes(
final List<DimensionSpec> dimensionSpecs,
final ResultRow resultRow,
final int resultRowDimensionStart
)
{
for (int i = 0; i < dimensionSpecs.size(); i++) {
DimensionSpec dimSpec = dimensionSpecs.get(i);
final int resultRowIndex = resultRowDimensionStart + i;
final ColumnType outputType = dimSpec.getOutputType();

resultRow.set(
resultRowIndex,
DimensionHandlerUtils.convertObjectToType(resultRow.get(resultRowIndex), outputType)
);
}
}

/**
* Wraps the sequence around if for this query a summary row might be needed in case the input becomes empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
* Returns -1 if cardinality + 1 (for null) > Integer.MAX_VALUE. Returns computed required buffer capacity
* otherwise.
*/
static long requiredBufferCapacity(int cardinality, AggregatorFactory[] aggregatorFactories)
public static long requiredBufferCapacity(int cardinality, AggregatorFactory[] aggregatorFactories)
{
final long cardinalityWithMissingValue = computeCardinalityWithMissingValue(cardinality);
// Cardinality should be in the integer range. See DimensionDictionarySelector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
* using a buffer provided by {@code mergeBufferPool} and a parallel executor provided by {@code exec}. Outputs a
* fully aggregated stream of {@link ResultRow} objects. Does not apply post-aggregators.
*
* The input {@code queryables} are expected to come from a {@link GroupByQueryEngineV2}. This code runs on data
* The input {@code queryables} are expected to come from a {@link GroupByQueryEngine}. This code runs on data
* servers, like Historicals.
*
* This class has some resemblance to {@link GroupByRowProcessor}. See the javadoc of that class for a discussion of
Expand All @@ -82,9 +82,9 @@
* Used by
* {@link org.apache.druid.query.groupby.GroupingEngine#mergeRunners(QueryProcessingPool, Iterable)}
*/
public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
public class GroupByMergingQueryRunner implements QueryRunner<ResultRow>
{
private static final Logger log = new Logger(GroupByMergingQueryRunnerV2.class);
private static final Logger log = new Logger(GroupByMergingQueryRunner.class);
private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";

private final GroupByQueryConfig config;
Expand All @@ -98,7 +98,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
private final String processingTmpDir;
private final int mergeBufferSize;

public GroupByMergingQueryRunnerV2(
public GroupByMergingQueryRunner(
GroupByQueryConfig config,
DruidProcessingConfig processingConfig,
QueryProcessingPool queryProcessingPool,
Expand Down
Loading

0 comments on commit 2118258

Please sign in to comment.