Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sorting on complex columns in MSQ #16322

Merged
merged 29 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c46cb4
init
LakshSingla Apr 23, 2024
486d29a
working, almost
LakshSingla Apr 26, 2024
260206b
stuff working
LakshSingla May 3, 2024
b93b53e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 3, 2024
ac8ba6b
tests, checkstyle
LakshSingla May 6, 2024
abbf49b
tests
LakshSingla May 6, 2024
b7ea7b8
more changes
LakshSingla May 6, 2024
f3caaf6
test comments
LakshSingla May 8, 2024
c852c35
changes
LakshSingla May 8, 2024
d2dd402
tests
LakshSingla May 8, 2024
a0d29d4
tests
LakshSingla May 8, 2024
ae87dab
tests
LakshSingla May 8, 2024
4b86b31
better comment
LakshSingla May 8, 2024
1ec5796
tests fix
LakshSingla May 8, 2024
b68948e
tests fix
LakshSingla May 8, 2024
08633f0
tests fix, test framework fix, comments
LakshSingla May 9, 2024
8d49678
Trigger Build
LakshSingla May 9, 2024
76fd60e
Merge branch 'master' into msq-complex-sorting
LakshSingla May 9, 2024
cd29bec
merge fix
LakshSingla May 9, 2024
690c5cf
convert list to array
LakshSingla May 9, 2024
d1d28e1
add back old tests
LakshSingla May 9, 2024
7f27724
preserve old tests, add new tests for complexcol + byte comparable col
LakshSingla May 9, 2024
ae77984
tests
LakshSingla May 10, 2024
2af2977
add benchmarks for nested data
LakshSingla May 10, 2024
b1c61cb
final set, have separate methods
LakshSingla May 10, 2024
eaa0593
some more final changes
LakshSingla May 10, 2024
2e23831
Merge branch 'master' into msq-complex-sorting
LakshSingla May 10, 2024
7356aa2
review comments
LakshSingla May 13, 2024
806b1ec
Update FrameWriterUtils.java
LakshSingla May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test comments
  • Loading branch information
LakshSingla committed May 8, 2024
commit f3caaf60d4b3bcede96f9958e96412fd4c81f913
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -45,6 +53,7 @@
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

public class MSQComplexGroupByTest extends MSQTestBase
{
Expand Down Expand Up @@ -119,10 +128,106 @@ public void testInsertWithRollupOnNestedData()
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("obj", ColumnType.NESTED_DATA)
.add("cnt", ColumnType.LONG)
.build())
.setExpectedResultRows(ImmutableList.of())
.setExpectedResultRows(ImmutableList.of(
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 500,
"b", ImmutableMap.of(
"x", "e",
"z", ImmutableList.of(1, 2, 3, 4)
),
"v", "a"
)
),
1L
},
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 100,
"b", ImmutableMap.of(
"x", "a",
"y", 1.1,
"z", ImmutableList.of(1, 2, 3, 4)
),
"v", Collections.emptyList()
)
),
2L
},
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 700,
"b", ImmutableMap.of(
"x", "g",
"y", 1.1,
"z", Arrays.asList(9, null, 9, 9)
),
"v", Collections.emptyList()
)
),
1L
},
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 200,
"b", ImmutableMap.of(
"x", "b",
"y", 1.1,
"z", ImmutableList.of(2, 4, 6)
),
"v", Collections.emptyList()
)
),
1L
},
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 600,
"b", ImmutableMap.of(
"x", "f",
"y", 1.1,
"z", ImmutableList.of(6, 7, 8, 9)
),
"v", "b"
)
),
1L
},
new Object[]{
0L,
StructuredData.wrap(
ImmutableMap.of(
"a", 400,
"b", ImmutableMap.of(
"x", "d",
"y", 1.1,
"z", ImmutableList.of(3, 4)
),
"v", Collections.emptyList()
)
),
1L
},
new Object[]{
0L,
StructuredData.wrap(ImmutableMap.of("a", 300)),
1L
}
))
.verifyResults();

}
Expand All @@ -142,11 +247,34 @@ public void testSortingOnNestedData()
+ " )\n"
+ " ORDER BY 1")
.setQueryContext(ImmutableMap.of())
.setExpectedMSQSpec(MSQSpec
.builder()
.query(newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("cnt", "dim1")
.build()
)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("obj", "obj")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(RowSignature.builder()
.add("obj", ColumnType.NESTED_DATA)
.build())
.setExpectedResultRows(ImmutableList.of())
.setExpectedResultRows(ImmutableList.of(
new Object[]{"{\"a\":500,\"b\":{\"x\":\"e\",\"z\":[1,2,3,4]},\"v\":\"a\"}"},
new Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"v\":[]}"},
new Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"v\":[]}"},
new Object[]{"{\"a\":700,\"b\":{\"x\":\"g\",\"y\":1.1,\"z\":[9,null,9,9]},\"v\":[]}"},
new Object[]{"{\"a\":200,\"b\":{\"x\":\"b\",\"y\":1.1,\"z\":[2,4,6]},\"v\":[]}"},
new Object[]{"{\"a\":600,\"b\":{\"x\":\"f\",\"y\":1.1,\"z\":[6,7,8,9]},\"v\":\"b\"}"},
new Object[]{"{\"a\":400,\"b\":{\"x\":\"d\",\"y\":1.1,\"z\":[3,4]},\"v\":[]}"},
new Object[]{"{\"a\":300}"}
))
.verifyResults();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ public void verifyResults()
.collect(Collectors.toList());

log.info(
"Found rows which are sorted forcefully %s",
"Found rows which are sorted forcefully\n%s",
transformedOutputRows.stream().map(Arrays::deepToString).collect(Collectors.joining("\n"))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public int compare(final byte[] keyArray1, final byte[] keyArray2)
continue;
}

// Index of the next field that will get considered. Excludes the last field of the current run length that is being
// compared in this iteration
final int nextField = fieldsComparedTillNow + runLengthEntry.getRunLength();
final int currentRunEndPosition1 = RowKeyReader.fieldEndPosition(keyArray1, nextField - 1);
final int currentRunEndPosition2 = RowKeyReader.fieldEndPosition(keyArray2, nextField - 1);

if (!runLengthEntry.isByteComparable()) {
// Only complex types are not byte comparable. Nested arrays aren't supported in MSQ
assert runLengthEntry.getRunLength() == 1;
Expand All @@ -128,10 +134,6 @@ public int compare(final byte[] keyArray1, final byte[] keyArray2)
complexTypeName
);

// Index of the next field that will get considered. Excludes the current field that we are comparing right now
final int nextField = fieldsComparedTillNow + 1;
final int currentRunEndPosition1 = RowKeyReader.fieldEndPosition(keyArray1, nextField - 1);
final int currentRunEndPosition2 = RowKeyReader.fieldEndPosition(keyArray2, nextField - 1);

int cmp = FrameReaderUtils.compareComplexTypes(
keyArray1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;

import javax.annotation.concurrent.NotThreadSafe;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +48,7 @@
* Comparison logic in this class is very similar to {@link RowKeyComparator}, but is different because it works
* on Frames instead of byte[].
*/
@NotThreadSafe
public class FrameComparisonWidgetImpl implements FrameComparisonWidget
{
private final Frame frame;
Expand All @@ -58,6 +60,9 @@ public class FrameComparisonWidgetImpl implements FrameComparisonWidget
private final List<FieldReader> keyFieldReaders;
private final int firstFieldPosition;
private final RowKeyComparisonRunLengths rowKeyComparisonRunLengths;
// We memoize the serde instead of fetching it everytime from the global map, since that is thread-safe and is guarded by a
// ConcurrentHashMap, while we only access FrameComparisonWidget from a single thread.
private final Map<String, ComplexMetricSerde> serdeMap = new HashMap<>();

private FrameComparisonWidgetImpl(
final Frame frame,
Expand Down Expand Up @@ -288,9 +293,6 @@ public int compare(final int row, final FrameComparisonWidget otherWidget, final
// Number of fields compared till now, which is equivalent to the index of the field to compare next
int fieldsComparedTillNow = 0;

// Memoize the serde instead of calling ComplexMetrics.getSerdeForType everytime
final Map<String, ComplexMetricSerde> serdeMap = new HashMap<>();

for (RowKeyComparisonRunLengths.RunLengthEntry runLengthEntry : rowKeyComparisonRunLengths.getRunLengthEntries()) {

if (runLengthEntry.getRunLength() <= 0) {
Expand Down Expand Up @@ -322,10 +324,11 @@ public int compare(final int row, final FrameComparisonWidget otherWidget, final
Preconditions.checkArgument(columnType1.equals(columnType2), "Different complex types cannot be compared");

// Use serde for the current implementation.
ComplexMetricSerde serde = Preconditions.checkNotNull(
ComplexMetrics.getSerdeForType(complexTypeName),
"serde for type [%s] not present",
complexTypeName
ComplexMetricSerde serde = serdeMap.computeIfAbsent(
complexTypeName, name ->
Preconditions.checkNotNull(
ComplexMetrics.getSerdeForType(name), "serde for type [%s] not present", complexTypeName
)
);

int nextField = fieldsComparedTillNow + 1;
Expand Down
Loading