Skip to content

Commit

Permalink
Grouping Engine fix when a limit spec with different order by columns…
Browse files Browse the repository at this point in the history
… is applied (#16534)
  • Loading branch information
sreemanamala committed Jun 20, 2024
1 parent 169a8db commit 7ac0862
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,15 +560,20 @@ private boolean canDoLimitPushDown(
return false;
}

/**
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
private Ordering<ResultRow> getRowOrderingForPushDown(
final boolean granular,
final DefaultLimitSpec limitSpec
)
public Ordering<ResultRow> getRowOrdering(final boolean granular)
{
return getOrderingAndDimensions(granular).getRowOrdering();
}

public List<String> getDimensionNamesInOrder()
{
return getOrderingAndDimensions(false).getDimensions()
.stream()
.map(DimensionSpec::getOutputName)
.collect(Collectors.toList());
}

public OrderingAndDimensions getOrderingAndDimensions(final boolean granular)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();

Expand All @@ -577,18 +582,30 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
final List<Boolean> needsReverseList = new ArrayList<>();
final List<ColumnType> dimensionTypes = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();
final List<DimensionSpec> dimensionsInOrder = new ArrayList<>();

for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ColumnType type = dimensions.get(dimIndex).getOutputType();
dimensionTypes.add(type);
comparators.add(orderSpec.getDimensionComparator());
/*
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
if (isApplyLimitPushDown()) {
DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec;
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1, dimensions)) {
for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec) limitSpec).getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ColumnType type = dimensions.get(dimIndex).getOutputType();
dimensionTypes.add(type);
comparators.add(orderSpec.getDimensionComparator());
dimensionsInOrder.add(dim);
}
}
}
}

Expand All @@ -599,14 +616,16 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
final ColumnType type = dimensions.get(i).getOutputType();
dimensionTypes.add(type);
comparators.add(StringComparators.NATURAL);
dimensionsInOrder.add(dimensions.get(i));
}
}

final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
Ordering<ResultRow> ordering;

if (timeComparator == null) {
return Ordering.from(
(lhs, rhs) -> compareDimsForLimitPushDown(
ordering = Ordering.from(
(lhs, rhs) -> compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -616,9 +635,9 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
)
);
} else if (sortByDimsFirst) {
return Ordering.from(
ordering = Ordering.from(
(lhs, rhs) -> {
final int cmp = compareDimsForLimitPushDown(
final int cmp = compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -634,15 +653,15 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
}
);
} else {
return Ordering.from(
ordering = Ordering.from(
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDimsForLimitPushDown(
return compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -653,45 +672,8 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
}
);
}
}

public Ordering<ResultRow> getRowOrdering(final boolean granular)
{
if (isApplyLimitPushDown()) {
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) {
return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec);
}
}

final boolean sortByDimsFirst = getContextSortByDimsFirst();
final Comparator<ResultRow> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs));
} else if (sortByDimsFirst) {
return Ordering.from(
(lhs, rhs) -> {
final int cmp = compareDims(dimensions, lhs, rhs);
if (cmp != 0) {
return cmp;
}

return timeComparator.compare(lhs, rhs);
}
);
} else {
return Ordering.from(
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDims(dimensions, lhs, rhs);
}
);
}
return new OrderingAndDimensions(ordering, dimensionsInOrder);
}

@Nullable
Expand All @@ -716,25 +698,6 @@ private Comparator<ResultRow> getTimeComparator(boolean granular)
}
}

private int compareDims(List<DimensionSpec> dimensions, ResultRow lhs, ResultRow rhs)
{
final int dimensionStart = getResultRowDimensionStart();

for (int i = 0; i < dimensions.size(); i++) {
DimensionSpec dimension = dimensions.get(i);
final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
lhs.get(dimensionStart + i),
rhs.get(dimensionStart + i),
dimension.getOutputType()
);
if (dimCompare != 0) {
return dimCompare;
}
}

return 0;
}

/**
* Computes the timestamp that will be returned by {@link #getUniversalTimestamp()}.
*/
Expand All @@ -760,12 +723,12 @@ private DateTime computeUniversalTimestamp()
}

/**
* Compares the dimensions for limit pushdown.
* Compares the dimensions.
*
* Due to legacy reason, the provided StringComparator for the arrays isn't applied and must be changed once we
* get rid of the StringComparators for array types
*/
private static int compareDimsForLimitPushDown(
private static int compareDims(
final IntList fields,
final List<Boolean> needsReverseList,
final List<ColumnType> dimensionTypes,
Expand Down Expand Up @@ -924,6 +887,28 @@ private static void verifyOutputNames(
}
}

public static class OrderingAndDimensions
{
Ordering<ResultRow> rowOrdering;
List<DimensionSpec> dimensions;

public OrderingAndDimensions(Ordering<ResultRow> rowOrdering, List<DimensionSpec> dimensions)
{
this.rowOrdering = rowOrdering;
this.dimensions = dimensions;
}

public Ordering<ResultRow> getRowOrdering()
{
return rowOrdering;
}

public List<DimensionSpec> getDimensions()
{
return dimensions;
}
}

public static class Builder
{
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,7 @@ public Sequence<ResultRow> processSubtotalsSpec(
processingConfig.intermediateComputeSizeBytes()
);

List<String> queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
.collect(Collectors.toList());
List<String> queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder();

// Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec.
Set<String> aggsAndPostAggs = null;
Expand Down Expand Up @@ -724,7 +723,7 @@ public Sequence<ResultRow> processSubtotalsSpec(
.withLimitSpec(subtotalQueryLimitSpec);

final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne;
if (Utils.isPrefix(subtotalSpec, queryDimNames)) {
if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) {
// Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted
// by subtotalSpec as needed by stream merging.
subtotalsResults.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13823,10 +13823,8 @@ public void testGroupingSetsWithLimitOrderByGran()
.build()
),
ImmutableList.<Object[]>builder().add(
new Object[]{"", null, 2L},
new Object[]{"a", null, 1L},
new Object[]{"", null, 1L},
new Object[]{"a", null, 1L},
new Object[]{"", null, 3L},
new Object[]{"a", null, 2L},
new Object[]{"abc", null, 1L},
new Object[]{NULL_STRING, null, 6L},
new Object[]{"", timestamp("2000-01-01"), 2L},
Expand Down Expand Up @@ -16290,4 +16288,29 @@ public void testGroupingSetsWithAggregateCase()
)
).run();
}

@SqlTestFrameworkConfig.NumMergeBuffers(3)
@Test
public void testGroupingSetsWithDifferentOrderLimitSpec()
{
msqIncompatible();
testBuilder()
.sql(
"SELECT\n"
+ " isNew, isRobot, COUNT(*) AS \"Cnt\"\n"
+ "FROM \"wikipedia\"\n"
+ "GROUP BY GROUPING SETS ((isRobot), (isNew))\n"
+ "ORDER BY 2, 1\n"
+ "limit 100"
)
.expectedResults(
ResultMatchMode.RELAX_NULLS,
ImmutableList.of(
new Object[]{"false", null, 36966L},
new Object[]{"true", null, 2278L},
new Object[]{null, "false", 23824L},
new Object[]{null, "true", 15420L}
)
).run();
}
}

0 comments on commit 7ac0862

Please sign in to comment.