diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index 217946f4ecd9..346b8e4a7f7b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.ResourceIdPopulatingQueryRunner; import org.apache.druid.timeline.SegmentId; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -433,7 +434,8 @@ public void setup() String queryName = schemaQuery[1]; schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName); - query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + query = (GroupByQuery) ResourceIdPopulatingQueryRunner.populateResourceId(SCHEMA_QUERY_MAP.get(schemaName) + .get(queryName)); generator = new DataGenerator( schemaInfo.getColumnSchemas(), @@ -762,12 +764,12 @@ public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableInde //noinspection unchecked QueryRunner theRunner = new FinalizeResultsQueryRunner<>( toolChest.mergeResults( - new SerializingQueryRunner<>( - new DefaultObjectMapper(new SmileFactory(), null), + new SerializingQueryRunner( + toolChest.decorateObjectMapper(new DefaultObjectMapper(new SmileFactory(), null), query), ResultRow.class, - toolChest.mergeResults( + (queryPlus, responseContext) -> toolChest.mergeResults( factory.mergeRunners(state.executorService, makeMultiRunners(state)) - ) + ).run(QueryPlus.wrap(ResourceIdPopulatingQueryRunner.populateResourceId(query))) ) ), (QueryToolChest) toolChest diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java index cfdd30af391a..334621ba6007 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java @@ -28,21 +28,22 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.groupby.ResultRow; -public class SerializingQueryRunner implements QueryRunner +public class SerializingQueryRunner implements QueryRunner { static { NullHandling.initializeForTests(); } private final ObjectMapper smileMapper; - private final QueryRunner baseRunner; - private final Class clazz; + private final QueryRunner baseRunner; + private final Class clazz; public SerializingQueryRunner( ObjectMapper smileMapper, - Class clazz, - QueryRunner baseRunner + Class clazz, + QueryRunner baseRunner ) { this.smileMapper = smileMapper; @@ -51,8 +52,8 @@ public SerializingQueryRunner( } @Override - public Sequence run( - final QueryPlus queryPlus, + public Sequence run( + final QueryPlus queryPlus, final ResponseContext responseContext ) { @@ -60,7 +61,7 @@ public Sequence run( baseRunner.run(queryPlus, responseContext), input -> { try { - return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input), clazz); + return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input.getArray()), clazz); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java index 7f8befd09275..1d5e81d3ac65 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongDouble inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongDouble o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongDouble a, SerializablePairLongDouble b) + { + return a.equals(b); + } + } + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java index d342608a9e15..1f4f624fef4e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -107,4 +111,27 @@ public byte[] toBytes(@Nullable SerializablePairLongFloat inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongFloat o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongFloat a, SerializablePairLongFloat b) + { + return a.equals(b); + } + } + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java index 7541dfdb2a15..7b8a60040157 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongLong inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongLong o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongLong a, SerializablePairLongLong b) + { + return a.equals(b); + } + } + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java index 2cc60843f9e7..625d597ba8c7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java @@ -19,11 +19,15 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.ComplexColumnPartSupplier; @@ -130,7 +134,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) } @Override - public ObjectStrategy getObjectStrategy() + public ObjectStrategy getObjectStrategy() { return new ObjectStrategy() { @@ -165,6 +169,29 @@ public byte[] toBytes(SerializablePairLongString val) }; } + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongString o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongString a, SerializablePairLongString b) + { + return a.equals(b); + } + } + ); + } + @Override public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 40dc9715885f..2fb4dfdd4d2a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -282,6 +282,7 @@ public boolean isVectorize() return vectorize; } + @SuppressWarnings("unused") public boolean isIntermediateResultAsMapCompat() { return intermediateResultAsMapCompat; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 81ec050ce088..47064fefbe60 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; @@ -31,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.data.input.Row; @@ -73,7 +74,10 @@ import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import org.joda.time.DateTime; import java.io.Closeable; @@ -103,7 +107,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest queryConfigSupplier, GroupByQueryMetricsFactory queryMetricsFactory, @Merging GroupByResourcesReservationPool groupByResourcesReservationPool ) { this.groupingEngine = groupingEngine; - this.queryConfig = queryConfigSupplier.get(); this.queryMetricsFactory = queryMetricsFactory; this.groupByResourcesReservationPool = groupByResourcesReservationPool; } @@ -451,12 +451,6 @@ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final { final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - if (resultAsArray && !queryConfig.isIntermediateResultAsMapCompat()) { - // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, - // and we can save the overhead of making a copy of the ObjectMapper. - return objectMapper; - } - // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. final JsonSerializer serializer = new JsonSerializer() { @@ -478,16 +472,83 @@ public void serialize( // Deserializer that can deserialize either array- or map-based rows. final JsonDeserializer deserializer = new JsonDeserializer() { + final Class[] dimensionClasses = createDimensionClasses(); + boolean containsComplexDimensions = query.getDimensions() + .stream() + .anyMatch( + dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) + ); + @Override public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException { if (jp.isExpectedStartObjectToken()) { final Row row = jp.readValueAs(Row.class); - return ResultRow.fromLegacyRow(row, query); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + if (containsComplexDimensions) { + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + dimensionClasses[i] + ) + ); + } + } + } + return resultRow; } else { - return ResultRow.of(jp.readValueAs(Object[].class)); + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } + + ObjectCodec codec = jp.getCodec(); + + jp.nextToken(); + + int numObjects = 0; + while (jp.currentToken() != JsonToken.END_ARRAY) { + if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { + objectArray[numObjects] = codec.readValue(jp, dimensionClasses[numObjects - query.getResultRowDimensionStart()]); + } else { + objectArray[numObjects] = codec.readValue(jp, Object.class); + } + jp.nextToken(); + ++numObjects; + } + return ResultRow.of(objectArray); + } + } + + private Class[] createDimensionClasses() + { + final List queryDimensions = query.getDimensions(); + final Class[] classes = new Class[queryDimensions.size()]; + for (int i = 0; i < queryDimensions.size(); ++i) { + final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); + if (dimensionOutputType.is(ValueType.COMPLEX)) { + NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + queryDimensions.get(i).getDimension(), + dimensionOutputType + ); + } + classes[i] = nullableTypeStrategy.getClazz(); + } else { + classes[i] = Object.class; + } } + return classes; } + }; class GroupByResultRowModule extends SimpleModule diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 044a092a3020..88642a964e8c 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -153,4 +153,9 @@ public boolean equals(@Nullable T a, @Nullable T b) } return b != null && delegate.equals(a, b); } + + public Class getClazz() + { + return delegate.getClazz(); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index 267477e52319..b274e55282ea 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -39,6 +39,8 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy private final TypeSignature typeSignature; @Nullable private final Hash.Strategy hashStrategy; + @Nullable + private final Class clazz; public ObjectStrategyComplexTypeStrategy(ObjectStrategy objectStrategy, TypeSignature signature) { @@ -54,7 +56,8 @@ public ObjectStrategyComplexTypeStrategy( this.objectStrategy = objectStrategy; this.typeSignature = signature; this.hashStrategy = hashStrategy; - + //noinspection VariableNotUsedInsideIf + this.clazz = hashStrategy == null ? null : objectStrategy.getClazz(); } @Override @@ -133,4 +136,13 @@ public boolean equals(T a, T b) } return hashStrategy.equals(a, b); } + + @Override + public Class getClazz() + { + if (clazz == null) { + throw DruidException.defensive("hashStrategy not provided"); + } + return clazz; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index 81aa03778362..c5cff1a0b2f2 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -193,6 +193,9 @@ default T fromBytes(byte[] value) * c. {@link #compare(Object, Object)} must be consistent with equals. Apart from abiding by the definition of * {@link Comparator#compare}, it must not return 0 for two objects that are not equals, and converse must also hold, * i.e. if the value returned by compare is not zero, then the arguments must not be equal. + *

+ * d. {@link #getClazz()} should return the Java class for the dimension represented by the type. This will be used by the + * mapper to deserialize the object during tasks like broker-historical interaction and spilling to the disk. */ default boolean groupable() { @@ -216,4 +219,12 @@ default boolean equals(T a, T b) { throw DruidException.defensive("Not implemented. Check groupable() first"); } + + /** + * @see #groupable() + */ + default Class getClazz() + { + throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()"); + } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 686e01e86173..526a62c813fb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -159,6 +159,9 @@ public static AggregationTestHelper createGroupByQueryAggregationTestHelper( final Closer closer = Closer.create(); final ObjectMapper mapper = TestHelper.makeJsonMapper(); final TestGroupByBuffers groupByBuffers = closer.register(TestGroupByBuffers.createDefault()); + for (Module mod : jsonModulesToRegister) { + mapper.registerModule(mod); + } final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( mapper, config, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index d2b599049944..f43bbce9d978 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.LongLastAggregatorFactory; @@ -645,6 +646,87 @@ public void testResultSerde() throws Exception ); } + @Test + public void testResultSerdeWithComplexDimension() throws Exception + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(ImmutableList.of( + DefaultDimensionSpec.of("test"), + new DefaultDimensionSpec( + "test2", + "test2", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + )) + .setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)) + .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator("post", 10))) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null); + + final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( + objectMapper, + query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, true)) + ); + final ObjectMapper mapsObjectMapper = toolChest.decorateObjectMapper( + objectMapper, + query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false)) + ); + + final Object[] rowObjects = { + DateTimes.of("2000").getMillis(), + "foo", + new SerializablePairLongString(1L, "test"), + 100, + 10.0 + }; + final ResultRow resultRow = ResultRow.of(rowObjects); + + // There are no tests with the standard mapper since it cannot convert the generic deserialized type for Pair class + // into the Pair class + + Assert.assertEquals( + "array mapper reads arrays", + resultRow, + arraysObjectMapper.readValue( + arraysObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + + Assert.assertEquals( + "array mapper reads arrays (2)", + resultRow, + arraysObjectMapper.readValue( + StringUtils.format("[%s, \"foo\", {\"lhs\":1,\"rhs\":\"test\"}, 100, 10.0]", DateTimes.of("2000").getMillis()), + ResultRow.class + ) + ); + + Assert.assertEquals( + "map mapper reads arrays", + resultRow, + mapsObjectMapper.readValue( + arraysObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + + Assert.assertEquals( + "map mapper reads maps", + resultRow, + mapsObjectMapper.readValue( + mapsObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + } + @Test public void testResultSerdeIntermediateResultAsMapCompat() throws Exception { @@ -658,19 +740,7 @@ public void testResultSerdeIntermediateResultAsMapCompat() throws Exception .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); - final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - null, - () -> new GroupByQueryConfig() - { - @Override - public boolean isIntermediateResultAsMapCompat() - { - return true; - } - }, - null, - null - ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null, null); final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 080d5d5d4ba3..3613246fef65 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -356,13 +356,12 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( configSupplier, bufferPools.getProcessingPool(), groupByResourcesReservationPool, - TestHelper.makeJsonMapper(), + mapper, mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, - () -> config, DefaultGroupByQueryMetricsFactory.instance(), groupByResourcesReservationPool ); diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 494ef763a783..7d3f58ef6000 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -79,6 +79,7 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentWrangler; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.join.FrameBasedInlineJoinableFactory; import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; @@ -242,13 +243,23 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat final Closer closer, final Supplier minTopNThresholdSupplier ) + { + return createQueryRunnerFactoryConglomerate(closer, minTopNThresholdSupplier, TestHelper.makeJsonMapper()); + } + + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final Supplier minTopNThresholdSupplier, + final ObjectMapper jsonMapper + ) { return createQueryRunnerFactoryConglomerate( closer, getProcessingConfig( DEFAULT_NUM_MERGE_BUFFERS ), - minTopNThresholdSupplier + minTopNThresholdSupplier, + jsonMapper ); } @@ -264,11 +275,41 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat ); } + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final DruidProcessingConfig processingConfig, + final ObjectMapper jsonMapper + ) + { + return createQueryRunnerFactoryConglomerate( + closer, + processingConfig, + () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD, + jsonMapper + ); + } + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( final Closer closer, final DruidProcessingConfig processingConfig, final Supplier minTopNThresholdSupplier ) + { + return createQueryRunnerFactoryConglomerate( + closer, + processingConfig, + minTopNThresholdSupplier, + TestHelper.makeJsonMapper() + ); + } + + + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final DruidProcessingConfig processingConfig, + final Supplier minTopNThresholdSupplier, + final ObjectMapper jsonMapper + ) { final TestBufferPool testBufferPool = TestBufferPool.offHeap(COMPUTE_BUFFER_SIZE, Integer.MAX_VALUE); closer.register(() -> { @@ -281,7 +322,7 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat final GroupByQueryRunnerFactory groupByQueryRunnerFactory = GroupByQueryRunnerTest.makeQueryRunnerFactory( - GroupByQueryRunnerTest.DEFAULT_MAPPER, + jsonMapper, new GroupByQueryConfig() { }, diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java index 74888c89607c..123019dad04f 100644 --- a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java +++ b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java @@ -280,9 +280,9 @@ public void configureGuice(DruidInjectorBuilder builder) } @Override - public QueryRunnerFactoryConglomerate createCongolmerate(Builder builder, Closer closer) + public QueryRunnerFactoryConglomerate createCongolmerate(Builder builder, Closer closer, ObjectMapper jsonMapper) { - return delegate.createCongolmerate(builder, closer); + return delegate.createCongolmerate(builder, closer, jsonMapper); } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 7e4675efc74e..1975f5589e65 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -62,6 +62,10 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory; @@ -747,6 +751,134 @@ public void testEarliestAggregators() ); } + @Test + public void testGroupingOnStringSerializablePairLongString() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT string_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "string_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{39244L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongLong() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT long_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "long_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongLongComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongDouble() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT double_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "double_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongFloat() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT float_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "float_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + @Test public void testLatestToLatestByConversion() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index c64888414b62..d195e0cefb91 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -147,7 +147,8 @@ public interface QueryComponentSupplier extends Closeable QueryRunnerFactoryConglomerate createCongolmerate( Builder builder, - Closer closer + Closer closer, + ObjectMapper jsonMapper ); SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( @@ -240,18 +241,21 @@ public void configureGuice(DruidInjectorBuilder builder) @Override public QueryRunnerFactoryConglomerate createCongolmerate( Builder builder, - Closer resourceCloser + Closer resourceCloser, + ObjectMapper jsonMapper ) { if (builder.mergeBufferCount == 0) { return QueryStackTests.createQueryRunnerFactoryConglomerate( resourceCloser, - () -> builder.minTopNThreshold + () -> builder.minTopNThreshold, + jsonMapper ); } else { return QueryStackTests.createQueryRunnerFactoryConglomerate( resourceCloser, - QueryStackTests.getProcessingConfig(builder.mergeBufferCount) + QueryStackTests.getProcessingConfig(builder.mergeBufferCount), + jsonMapper ); } } @@ -551,7 +555,7 @@ public void configure(Binder binder) @LazySingleton public QueryRunnerFactoryConglomerate conglomerate() { - return componentSupplier.createCongolmerate(builder, resourceCloser); + return componentSupplier.createCongolmerate(builder, resourceCloser, queryJsonMapper()); } @Provides diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index c914532a3b90..f732771991c4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -51,9 +51,11 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; @@ -655,8 +657,9 @@ public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir) new FloatLastAggregatorFactory("float_last_added", "added", "__time"), new FloatLastAggregatorFactory("float_first_added", "added", "__time"), new DoubleLastAggregatorFactory("double_last_added", "added", "__time"), - new DoubleFirstAggregatorFactory("double_first_added", "added", "__time") - + new DoubleFirstAggregatorFactory("double_first_added", "added", "__time"), + new StringFirstAggregatorFactory("string_first_added", "comment", "__time", 1000), + new StringLastAggregatorFactory("string_last_added", "comment", "__time", 1000) ) .build() )