Skip to content

Commit

Permalink
add java bindings for non timestamp range window
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 committed May 9, 2021
1 parent 2273b6d commit c3fea68
Show file tree
Hide file tree
Showing 6 changed files with 1,316 additions and 806 deletions.
5 changes: 5 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Scalar.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ public int hashCode() {
case UINT32:
case TIMESTAMP_DAYS:
case DECIMAL32:
case DURATION_DAYS:
valueHash = getInt();
break;
case INT64:
Expand All @@ -629,6 +630,10 @@ public int hashCode() {
case TIMESTAMP_MICROSECONDS:
case TIMESTAMP_NANOSECONDS:
case DECIMAL64:
case DURATION_MICROSECONDS:
case DURATION_SECONDS:
case DURATION_MILLISECONDS:
case DURATION_NANOSECONDS:
valueHash = Long.hashCode(getLong());
break;
case FLOAT32:
Expand Down
87 changes: 60 additions & 27 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,10 @@ private static native long[] rollingWindowAggregate(
int[] following,
boolean ignoreNullKeys) throws CudfException;

private static native long[] timeRangeRollingWindowAggregate(long inputTable, int[] keyIndices, int[] timestampIndices, boolean[] isTimesampAscending,
int[] aggColumnsIndices, long[] aggInstances, int[] minPeriods,
int[] preceding, int[] following, boolean[] unboundedPreceding, boolean[] unboundedFollowing,
boolean ignoreNullKeys) throws CudfException;
private static native long[] rangeRollingWindowAggregate(long inputTable, int[] keyIndices, int[] orderByIndices, boolean[] isOrderByAscending,
int[] aggColumnsIndices, long[] aggInstances, int[] minPeriods,
long[] preceding, long[] following, boolean[] unboundedPreceding, boolean[] unboundedFollowing,
boolean ignoreNullKeys) throws CudfException;

private static native long sortOrder(long inputTable, long[] sortKeys, boolean[] isDescending,
boolean[] areNullsSmallest) throws CudfException;
Expand Down Expand Up @@ -2457,7 +2457,7 @@ public Table aggregateWindows(AggregationOverWindow... windowAggregates) {
}

/**
* Computes time-range-based window aggregation functions on the Table/projection,
* Computes range-based window aggregation functions on the Table/projection,
* based on windows specified in the argument.
*
* This method enables queries such as the following SQL:
Expand Down Expand Up @@ -2506,10 +2506,10 @@ public Table aggregateWindows(AggregationOverWindow... windowAggregates) {
* @param windowAggregates the window-aggregations to be performed
* @return Table instance, with each column containing the result of each aggregation.
* @throws IllegalArgumentException if the window arguments are not of type
* {@link WindowOptions.FrameType#RANGE},
* {@link WindowOptions.FrameType#RANGE} or the orderBys are not of (Boolean-exclusive) integral type
* i.e. the timestamp-column was not specified for the aggregation.
*/
public Table aggregateWindowsOverTimeRanges(AggregationOverWindow... windowAggregates) {
public Table aggregateWindowsOverRanges(AggregationOverWindow... windowAggregates) {
// To improve performance and memory we want to remove duplicate operations
// and also group the operations by column so hopefully cudf can do multiple aggregations
// in a single pass.
Expand All @@ -2521,51 +2521,76 @@ public Table aggregateWindowsOverTimeRanges(AggregationOverWindow... windowAggre
for (int outputIndex = 0; outputIndex < windowAggregates.length; outputIndex++) {
AggregationOverWindow agg = windowAggregates[outputIndex];
if (agg.getWindowOptions().getFrameType() != WindowOptions.FrameType.RANGE) {
throw new IllegalArgumentException("Expected time-range-based window specification. Unexpected window type: "
+ agg.getWindowOptions().getFrameType());
throw new IllegalArgumentException("Expected range-based window specification. Unexpected window type: "
+ agg.getWindowOptions().getFrameType());
}

DType orderByType = operation.table.getColumn(agg.getWindowOptions().getOrderByColumnIndex()).getType();
switch (orderByType.getTypeId()) {
case INT8:
case INT16:
case INT32:
case INT64:
case UINT8:
case UINT16:
case UINT32:
case UINT64:
case TIMESTAMP_MILLISECONDS:
case TIMESTAMP_SECONDS:
case TIMESTAMP_DAYS:
case TIMESTAMP_NANOSECONDS:
case TIMESTAMP_MICROSECONDS:
break;
default:
throw new IllegalArgumentException("Expected range-based window orderBy's " +
"type: integral (Boolean-exclusive) and timestamp");
}

ColumnWindowOps ops = groupedOps.computeIfAbsent(agg.getColumnIndex(), (idx) -> new ColumnWindowOps());
totalOps += ops.add(agg, outputIndex);
}

int[] aggColumnIndexes = new int[totalOps];
int[] timestampColumnIndexes = new int[totalOps];
boolean[] isTimestampOrderAscending = new boolean[totalOps];
int[] orderByColumnIndexes = new int[totalOps];
boolean[] isOrderByOrderAscending = new boolean[totalOps];
long[] aggInstances = new long[totalOps];
long[] aggPrecedingWindows = new long[totalOps];
long[] aggFollowingWindows = new long[totalOps];
try {
int[] aggPrecedingWindows = new int[totalOps];
int[] aggFollowingWindows = new int[totalOps];
boolean[] aggPrecedingWindowsUnbounded = new boolean[totalOps];
boolean[] aggFollowingWindowsUnbounded = new boolean[totalOps];
int[] aggMinPeriods = new int[totalOps];
int opIndex = 0;
for (Map.Entry<Integer, ColumnWindowOps> entry: groupedOps.entrySet()) {
int columnIndex = entry.getKey();
for (AggregationOverWindow operation: entry.getValue().operations()) {
for (AggregationOverWindow op: entry.getValue().operations()) {
aggColumnIndexes[opIndex] = columnIndex;
aggInstances[opIndex] = operation.createNativeInstance();
aggPrecedingWindows[opIndex] = operation.getWindowOptions().getPreceding();
aggFollowingWindows[opIndex] = operation.getWindowOptions().getFollowing();
aggPrecedingWindowsUnbounded[opIndex] = operation.getWindowOptions().isUnboundedPreceding();
aggFollowingWindowsUnbounded[opIndex] = operation.getWindowOptions().isUnboundedFollowing();
aggMinPeriods[opIndex] = operation.getWindowOptions().getMinPeriods();
assert (operation.getWindowOptions().getFrameType() == WindowOptions.FrameType.RANGE);
timestampColumnIndexes[opIndex] = operation.getWindowOptions().getTimestampColumnIndex();
isTimestampOrderAscending[opIndex] = operation.getWindowOptions().isTimestampOrderAscending();
if (operation.getDefaultOutput() != 0) {
aggInstances[opIndex] = op.createNativeInstance();
aggPrecedingWindows[opIndex] = op.getWindowOptions().getPrecedingScalar() ==
null ? 0 : op.getWindowOptions().getPrecedingScalar().getScalarHandle();
aggFollowingWindows[opIndex] = op.getWindowOptions().getFollowingScalar() ==
null ? 0 : op.getWindowOptions().getFollowingScalar().getScalarHandle();
aggPrecedingWindowsUnbounded[opIndex] = op.getWindowOptions().isUnboundedPreceding();
aggFollowingWindowsUnbounded[opIndex] = op.getWindowOptions().isUnboundedFollowing();
aggMinPeriods[opIndex] = op.getWindowOptions().getMinPeriods();
assert (op.getWindowOptions().getFrameType() == WindowOptions.FrameType.RANGE);
orderByColumnIndexes[opIndex] = op.getWindowOptions().getOrderByColumnIndex();
isOrderByOrderAscending[opIndex] = op.getWindowOptions().isOrderByOrderAscending();
if (op.getDefaultOutput() != 0) {
throw new IllegalArgumentException("Operations with a default output are not " +
"supported on time based rolling windows");
}

opIndex++;
}
}
assert opIndex == totalOps : opIndex + " == " + totalOps;

try (Table aggregate = new Table(timeRangeRollingWindowAggregate(
try (Table aggregate = new Table(rangeRollingWindowAggregate(
operation.table.nativeHandle,
operation.indices,
timestampColumnIndexes,
isTimestampOrderAscending,
orderByColumnIndexes,
isOrderByOrderAscending,
aggColumnIndexes,
aggInstances, aggMinPeriods, aggPrecedingWindows, aggFollowingWindows,
aggPrecedingWindowsUnbounded, aggFollowingWindowsUnbounded,
Expand Down Expand Up @@ -2630,6 +2655,14 @@ public ContiguousTable[] contiguousSplitGroups() {
groupByOptions.getKeysDescending(),
groupByOptions.getKeysNullSmallest());
}

/**
* @deprecated use aggregateWindowsOverRanges
*/
@Deprecated
public Table aggregateWindowsOverTimeRanges(AggregationOverWindow... windowAggregates) {
return aggregateWindowsOverRanges(windowAggregates);
}
}

public static final class TableOperation {
Expand Down
Loading

0 comments on commit c3fea68

Please sign in to comment.