Skip to content

Commit

Permalink
expression virtual column indexes (#15585)
Browse files Browse the repository at this point in the history
* ExpressionVirtualColumn + indexes = bff. Expression virtual columns can now use indexes of the underlying columns similar to how expression filters
  • Loading branch information
clintropolis committed Jan 4, 2024
1 parent 8e95cea commit f19ece1
Show file tree
Hide file tree
Showing 24 changed files with 1,042 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@

package org.apache.druid.benchmark.query;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -73,6 +80,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Benchmark that tests various SQL queries.
Expand Down Expand Up @@ -202,7 +210,9 @@ public String getFormatString()
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 38,39: array element filtering
"SELECT string1, long1 FROM foo WHERE ARRAY_CONTAINS(\"multi-string3\", 100) GROUP BY 1,2",
"SELECT string1, long1 FROM foo WHERE ARRAY_OVERLAP(\"multi-string3\", ARRAY[100, 200]) GROUP BY 1,2"
"SELECT string1, long1 FROM foo WHERE ARRAY_OVERLAP(\"multi-string3\", ARRAY[100, 200]) GROUP BY 1,2",
// 40: regex filtering
"SELECT string4, COUNT(*) FROM foo WHERE REGEXP_EXTRACT(string1, '^1') IS NOT NULL OR REGEXP_EXTRACT('Z' || string2, '^Z2') IS NOT NULL GROUP BY 1"
);

@Param({"5000000"})
Expand Down Expand Up @@ -262,7 +272,8 @@ public String getFormatString()
"36",
"37",
"38",
"39"
"39",
"40"
})
private String query;

Expand Down Expand Up @@ -290,10 +301,15 @@ public void setup()
log.info("Starting benchmark setup using cacheDir[%s], rows[%,d], schema[%s].", segmentGenerator.getCacheDir(), rowsPerSegment, schema);
final QueryableIndex index;
if ("auto".equals(schema)) {
List<DimensionSchema> columnSchemas = schemaInfo.getDimensionsSpec()
.getDimensions()
.stream()
.map(x -> new AutoTypeColumnSchema(x.getName(), null))
.collect(Collectors.toList());
index = segmentGenerator.generate(
dataSegment,
schemaInfo,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
DimensionsSpec.builder().setDimensions(columnSchemas).build(),
TransformSpec.NONE,
IndexSpec.DEFAULT,
Granularities.NONE,
Expand All @@ -313,7 +329,7 @@ public void setup()
index
);
closer.register(walker);

final ObjectMapper jsonMapper = CalciteTests.getJsonMapper();
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
Expand All @@ -323,7 +339,7 @@ public void setup()
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
jsonMapper,
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper(),
Expand All @@ -340,6 +356,35 @@ public void setup()
catch (Throwable ignored) {
// the show must go on
}
final String sql = QUERIES.get(Integer.parseInt(query));

try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, "EXPLAIN PLAN FOR " + sql, ImmutableMap.of("useNativeQueryExplain", true))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] planResult = resultSequence.toList().get(0);
log.info("Native query plan:\n" +
jsonMapper.writerWithDefaultPrettyPrinter()
.writeValueAsString(jsonMapper.readValue((String) planResult[0], List.class))
);
}
catch (JsonMappingException e) {
throw new RuntimeException(e);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, ImmutableMap.of())) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Yielder<Object[]> yielder = Yielders.each(resultSequence);
int rowCounter = 0;
while (!yielder.isDone()) {
rowCounter++;
yielder.next(yielder.get());
}
log.info("Total result row count:" + rowCounter);
}
}

@TearDown(Level.Trial)
Expand Down
49 changes: 49 additions & 0 deletions processing/src/main/java/org/apache/druid/math/expr/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.index.semantic.DictionaryEncodedValueIndex;
import org.apache.druid.segment.serde.NoIndexesColumnIndexSupplier;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -185,6 +193,47 @@ default <T> ExprVectorProcessor<T> asVectorProcessor(VectorInputBindingInspector
throw Exprs.cannotVectorize(this);
}

@Nullable
default ColumnIndexSupplier asColumnIndexSupplier(ColumnSelector columnSelector, @Nullable ColumnType outputType)
{
final Expr.BindingAnalysis details = analyzeInputs();
if (details.getRequiredBindings().size() == 1) {
// Single-column expression. We can use bitmap indexes if this column has an index and the expression can
// map over the values of the index.
final String column = Iterables.getOnlyElement(details.getRequiredBindings());

final ColumnHolder holder = columnSelector.getColumnHolder(column);
if (holder == null) {
// column doesn't exist, no index supplier
return null;
}
final ColumnCapabilities capabilities = holder.getCapabilities();
final ColumnIndexSupplier delegateIndexSupplier = holder.getIndexSupplier();
final DictionaryEncodedValueIndex<?> delegateRawIndex = delegateIndexSupplier.as(
DictionaryEncodedValueIndex.class
);

final ExpressionType inputType = ExpressionType.fromColumnTypeStrict(capabilities);
final ColumnType outType;
if (outputType == null) {
outType = ExpressionType.toColumnType(getOutputType(InputBindings.inspectorForColumn(column, inputType)));
} else {
outType = outputType;
}

if (delegateRawIndex != null && outputType != null) {
return new ExpressionPredicateIndexSupplier(
this,
column,
inputType,
outType,
delegateRawIndex
);
}
}
return NoIndexesColumnIndexSupplier.getInstance();
}


/**
* Decorates the {@link CacheKeyBuilder} for the default implementation of {@link #getCacheKey()}. The default cache
Expand Down
Loading

0 comments on commit f19ece1

Please sign in to comment.