Skip to content

Commit

Permalink
Use merge sort for struct types in non-key columns (#4459)
Browse files Browse the repository at this point in the history
* Use merge sort for struct types in non-key columns

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Jan 11, 2022
1 parent 9c8a96e commit adbbfcd
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table}

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{ArrayType, DataType, MapType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object SortUtils extends Arm {
Expand Down Expand Up @@ -210,6 +211,20 @@ class GpuSorter(
}
}

private[this] lazy val hasNestedInKeyColumns = cpuOrderingInternal.exists ( order =>
DataTypeUtils.isNestedType(order.child.dataType)
)

/** (This can be removed once https://github.com/rapidsai/cudf/issues/8050 is addressed) */
private[this] lazy val hasUnsupportedNestedInRideColumns = {
val keyColumnIndices = cpuOrderingInternal.map(_.child.asInstanceOf[BoundReference].ordinal)
val rideColumnIndices = projectedBatchTypes.indices.toSet -- keyColumnIndices
rideColumnIndices.exists { idx =>
TrampolineUtil.dataTypeExistsRecursively(projectedBatchTypes(idx),
t => t.isInstanceOf[ArrayType] || t.isInstanceOf[MapType])
}
}

/**
* Merge multiple batches together. All of these batches should be the output of
* `appendProjectedColumns` and the output of this will also be in that same format.
Expand All @@ -226,15 +241,10 @@ class GpuSorter(
batches.foreach { cb =>
tabs += GpuColumnVector.from(cb)
}
// In the current version of cudf merge does not work for structs or lists (nested types)
// In the current version of cudf merge does not work for lists and maps.
// This should be fixed by https://github.com/rapidsai/cudf/issues/8050
val hasNested = {
val tab = tabs.head
(0 until tab.getNumberOfColumns).exists { i =>
tab.getColumn(i).getType.isNestedType
}
}
if (hasNested) {
// Nested types in sort key columns is not supported either.
if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) {
// so as a work around we concatenate all of the data together and then sort it.
// It is slower, but it works
withResource(Table.concatenate(tabs: _*)) { concatenated =>
Expand Down

0 comments on commit adbbfcd

Please sign in to comment.