Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable reordering of window operators #16482

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
window operator reordering
  • Loading branch information
sreemanamala committed May 21, 2024
commit a972698edb4c945a2d12cbeee7f4a4a592497282
148 changes: 112 additions & 36 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;

/**
* Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}.
Expand Down Expand Up @@ -123,45 +124,14 @@ public static Windowing fromCalciteStuff(
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");

ArrayList<OperatorFactory> ops = new ArrayList<>();
List<WindowGroupProcessorWrapper> wrapperObjs = new ArrayList<>();

final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
int outputNameCounter = 0;

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

for (int i = 0; i < window.groups.size(); ++i) {
final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature);

final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}
for (Window.Group windowGroup : window.groups) {
final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature);

// Add aggregations.
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
Expand Down Expand Up @@ -232,10 +202,48 @@ public static Windowing fromCalciteStuff(
throw new ISE("No processors from Window[%s], why was this code called?", window);
}

ops.add(new WindowOperatorFactory(
wrapperObjs.add(new WindowGroupProcessorWrapper(windowGroup, new WindowOperatorFactory(
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
));
)));
}

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

Collections.sort(wrapperObjs);
ArrayList<OperatorFactory> ops = new ArrayList<>();
for (WindowGroupProcessorWrapper wrapperObj : wrapperObjs) {
final WindowGroup group = new WindowGroup(window, wrapperObj.getGroup(), sourceRowSignature);
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}

ops.add(wrapperObj.getProcessorOperatorFactory());
}

// Apply windowProject, if present.
Expand Down Expand Up @@ -270,6 +278,74 @@ public static Windowing fromCalciteStuff(
}
}

/**
* A wrapper object which stores {@link org.apache.calcite.rel.core.Window.Group}
* along with its computed {@link WindowOperatorFactory}
*
* this allows us to sort the window groups in order to optimise the order of operators we would need to compute
* without losing the aggregate column name information (which is part of the computed WindowOperatorFactory)
*/
private static class WindowGroupProcessorWrapper implements Comparable<WindowGroupProcessorWrapper>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of implementing Comparable - a static field can be added to define the comparision logic ; that will provide a place to name the ordering ; and also a place to put usefull notes into its apidocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified. Now we can have multiple comparators.
I thought that we would not be requiring multiple, at the end we might have to only use the comparator that would optimise us to use minimal operators.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to suggest a better name for it - but don't really have any good suggestions.... WindowedCalculation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to WindowComputationProcessor. let me know your thoughts on this name

{
private final Window.Group group;
private final OperatorFactory processorOperatorFactory;

public WindowGroupProcessorWrapper(Window.Group group, OperatorFactory processorOperatorFactory)
{
this.group = group;
this.processorOperatorFactory = processorOperatorFactory;
}

public Window.Group getGroup()
{
return group;
}

public OperatorFactory getProcessorOperatorFactory()
{
return processorOperatorFactory;
}

@Override
public int compareTo(WindowGroupProcessorWrapper o)
{
// Need to work on this method to optimise the order in which we need to process based on the partitions
// currently just moves the empty windows to the front
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it moves the empty to the front; however the order of the remaining elements are undefined - I wonder if that could cause different behaviour between jvms
something like a move-to-front logic might not have that problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the order of remaining elements would be as is, since in all other cases we return 0 to look like they are same. Would that not be sufficient?

if (this.group.keys.isEmpty() && o.group.keys.isEmpty()) {
return 0;
}
if (this.group.keys.isEmpty()) {
return -1;
}
if (o.group.keys.isEmpty()) {
return 1;
}
return 0;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowGroupProcessorWrapper wrapper = (WindowGroupProcessorWrapper) o;
return Objects.equals(group, wrapper.group) && Objects.equals(
processorOperatorFactory,
wrapper.processorOperatorFactory
);
}

@Override
public int hashCode()
{
return Objects.hash(group, processorOperatorFactory);
}
}

private final RowSignature signature;

public Windowing(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
type: "operatorValidation"

sql: |
SELECT
m1,
m2,
SUM(m1) OVER(PARTITION BY m2) as sum1,
SUM(m2) OVER() as sum2
from druid.numfoo
GROUP BY m1,m2

expectedOperators:
- type: "naivePartition"
partitionColumns: [ ]
Comment on lines +13 to +14
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily right now - but I think a check should be added to the NaivePartitioningOperator:

  • if the partitioningColumns is empty
  • it may only get at most 1 RACs
    I wonder if we've already violated that or not - but probably not...

- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w1", "fieldName": "_d1" }
- type: "naiveSort"
columns:
- column: "_d1"
direction: "ASC"
- type: "naivePartition"
partitionColumns: [ "_d1" ]
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w0", "fieldName": "_d0" }
expectedResults:
- [1.0, 1.0, 1.0, 21.0]
- [2.0, 2.0, 2.0, 21.0]
- [3.0, 3.0, 3.0, 21.0]
- [4.0, 4.0, 4.0, 21.0]
- [5.0, 5.0, 5.0, 21.0]
- [6.0, 6.0, 6.0, 21.0]
Loading