Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable reordering of window operators #16482

Merged
merged 7 commits into from
May 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor
  • Loading branch information
sreemanamala committed May 28, 2024
commit 78d064a42ffe846b6fab0d38c3df9d9673992aef
184 changes: 86 additions & 98 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,105 +127,7 @@ public static Windowing fromCalciteStuff(

final List<WindowComputationProcessor> windowGroupProcessors = new ArrayList<>();
final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());
computeProcessorsAndOutputColumns(
partialQuery,
plannerContext,
sourceRowSignature,
rexBuilder,
virtualColumnRegistry,
window,
windowGroupProcessors,
windowOutputColumns
);

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

// sort the processors to optimise the order of window operators
// currently we are moving the empty groups to the front
windowGroupProcessors.sort(WindowComputationProcessor.MOVE_EMPTY_GROUPS_FIRST);

ArrayList<OperatorFactory> ops = new ArrayList<>();
for (WindowComputationProcessor windowComputationProcessor : windowGroupProcessors) {
final WindowGroup group = windowComputationProcessor.getGroup();
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}

ops.add(windowComputationProcessor.getProcessorOperatorFactory());
}

// Apply windowProject, if present.
if (partialQuery.getWindowProject() != null) {
// We know windowProject is a mapping due to the isMapping() check in DruidRules.
// check anyway as defensive programming.
Preconditions.checkArgument(partialQuery.getWindowProject().isMapping());
final Mappings.TargetMapping mapping = Preconditions.checkNotNull(
Project.getPartialMapping(
partialQuery.getWindowProject().getInput().getRowType().getFieldCount(),
partialQuery.getWindowProject().getProjects()
),
"mapping for windowProject[%s]",
partialQuery.getWindowProject()
);

final List<String> windowProjectOutputColumns = new ArrayList<>();
for (int i = 0; i < mapping.size(); i++) {
windowProjectOutputColumns.add(windowOutputColumns.get(mapping.getSourceOpt(i)));
}

return new Windowing(
RowSignatures.fromRelDataType(windowProjectOutputColumns, partialQuery.getWindowProject().getRowType()),
ops
);
} else {
// No windowProject.
return new Windowing(
RowSignatures.fromRelDataType(windowOutputColumns, window.getRowType()),
ops
);
}
}

/**
* Computes the {@link WindowComputationProcessor} and output column name
* corresponding to each {@link org.apache.calcite.rel.core.Window.Group}
*/
private static void computeProcessorsAndOutputColumns(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
final RexBuilder rexBuilder,
final VirtualColumnRegistry virtualColumnRegistry,
Window window,
List<WindowComputationProcessor> windowGroupProcessors,
List<String> windowOutputColumns
)
{
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
int outputNameCounter = 0;

Expand Down Expand Up @@ -307,6 +209,92 @@ private static void computeProcessorsAndOutputColumns(
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
)));
}

List<OperatorFactory> ops = computeWindowOperations(partialQuery, sourceRowSignature, windowGroupProcessors);

// Apply windowProject, if present.
if (partialQuery.getWindowProject() != null) {
// We know windowProject is a mapping due to the isMapping() check in DruidRules.
// check anyway as defensive programming.
Preconditions.checkArgument(partialQuery.getWindowProject().isMapping());
final Mappings.TargetMapping mapping = Preconditions.checkNotNull(
Project.getPartialMapping(
partialQuery.getWindowProject().getInput().getRowType().getFieldCount(),
partialQuery.getWindowProject().getProjects()
),
"mapping for windowProject[%s]",
partialQuery.getWindowProject()
);

final List<String> windowProjectOutputColumns = new ArrayList<>();
for (int i = 0; i < mapping.size(); i++) {
windowProjectOutputColumns.add(windowOutputColumns.get(mapping.getSourceOpt(i)));
}

return new Windowing(
RowSignatures.fromRelDataType(windowProjectOutputColumns, partialQuery.getWindowProject().getRowType()),
ops
);
} else {
// No windowProject.
return new Windowing(
RowSignatures.fromRelDataType(windowOutputColumns, window.getRowType()),
ops
);
}
}

/**
* Computes the list of operators that are to be applied in an optimised order
*/
private static List<OperatorFactory> computeWindowOperations(
final PartialDruidQuery partialQuery,
final RowSignature sourceRowSignature,
List<WindowComputationProcessor> windowGroupProcessors
)
{
final List<OperatorFactory> ops = new ArrayList<>();
// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

// sort the processors to optimise the order of window operators
// currently we are moving the empty groups to the front
windowGroupProcessors.sort(WindowComputationProcessor.MOVE_EMPTY_GROUPS_FIRST);

for (WindowComputationProcessor windowComputationProcessor : windowGroupProcessors) {
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved
final WindowGroup group = windowComputationProcessor.getGroup();
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}

ops.add(windowComputationProcessor.getProcessorOperatorFactory());
}

return ops;
}

private static class WindowComputationProcessor
Expand Down
Loading