Skip to content

Commit

Permalink
Deserialize dimensions in group by queries to their respective types …
Browse files Browse the repository at this point in the history
…when reading from their serialized format (#16511)

* init

* tests, pair groupable

* framework change

* tests

* update benchmarks

* comments

* add javadoc for the jsonMapper

* remove extra deserialization

* add special serde for map based result rows

* revert unnecessary change

---------

Co-authored-by: asdf2014 <asdf2014@apache.org>
  • Loading branch information
LakshSingla and asdf2014 committed Jun 14, 2024
1 parent e1926e2 commit da1e293
Show file tree
Hide file tree
Showing 19 changed files with 507 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.ResourceIdPopulatingQueryRunner;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -433,7 +434,8 @@ public void setup()
String queryName = schemaQuery[1];

schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
query = (GroupByQuery) ResourceIdPopulatingQueryRunner.populateResourceId(SCHEMA_QUERY_MAP.get(schemaName)
.get(queryName));

generator = new DataGenerator(
schemaInfo.getColumnSchemas(),
Expand Down Expand Up @@ -762,12 +764,12 @@ public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableInde
//noinspection unchecked
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
new SerializingQueryRunner<>(
new DefaultObjectMapper(new SmileFactory(), null),
new SerializingQueryRunner(
toolChest.decorateObjectMapper(new DefaultObjectMapper(new SmileFactory(), null), query),
ResultRow.class,
toolChest.mergeResults(
(queryPlus, responseContext) -> toolChest.mergeResults(
factory.mergeRunners(state.executorService, makeMultiRunners(state))
)
).run(QueryPlus.wrap(ResourceIdPopulatingQueryRunner.populateResourceId(query)))
)
),
(QueryToolChest) toolChest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.ResultRow;

public class SerializingQueryRunner<T> implements QueryRunner<T>
public class SerializingQueryRunner implements QueryRunner<ResultRow>
{
static {
NullHandling.initializeForTests();
}

private final ObjectMapper smileMapper;
private final QueryRunner<T> baseRunner;
private final Class<T> clazz;
private final QueryRunner<ResultRow> baseRunner;
private final Class<ResultRow> clazz;

public SerializingQueryRunner(
ObjectMapper smileMapper,
Class<T> clazz,
QueryRunner<T> baseRunner
Class<ResultRow> clazz,
QueryRunner<ResultRow> baseRunner
)
{
this.smileMapper = smileMapper;
Expand All @@ -51,16 +52,16 @@ public SerializingQueryRunner(
}

@Override
public Sequence<T> run(
final QueryPlus<T> queryPlus,
public Sequence<ResultRow> run(
final QueryPlus<ResultRow> queryPlus,
final ResponseContext responseContext
)
{
return Sequences.map(
baseRunner.run(queryPlus, responseContext),
input -> {
try {
return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input), clazz);
return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input.getArray()), clazz);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongDouble inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongDouble> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongDouble>()
{
@Override
public int hashCode(SerializablePairLongDouble o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongDouble a, SerializablePairLongDouble b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -107,4 +111,27 @@ public byte[] toBytes(@Nullable SerializablePairLongFloat inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongFloat> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongFloat>()
{
@Override
public int hashCode(SerializablePairLongFloat o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongFloat a, SerializablePairLongFloat b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongLong inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongLong> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongLong>()
{
@Override
public int hashCode(SerializablePairLongLong o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongLong a, SerializablePairLongLong b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
Expand Down Expand Up @@ -130,7 +134,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
}

@Override
public ObjectStrategy<?> getObjectStrategy()
public ObjectStrategy<SerializablePairLongString> getObjectStrategy()
{
return new ObjectStrategy<SerializablePairLongString>()
{
Expand Down Expand Up @@ -165,6 +169,29 @@ public byte[] toBytes(SerializablePairLongString val)
};
}

@Override
public TypeStrategy<SerializablePairLongString> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongString>()
{
@Override
public int hashCode(SerializablePairLongString o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongString a, SerializablePairLongString b)
{
return a.equals(b);
}
}
);
}

@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public boolean isVectorize()
return vectorize;
}

@SuppressWarnings("unused")
public boolean isIntermediateResultAsMapCompat()
{
return intermediateResultAsMapCompat;
Expand Down
Loading

0 comments on commit da1e293

Please sign in to comment.