diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala index ce2c5c63e07..b9e405b23cf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,8 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{ArrayType, DataType, MapType} import org.apache.spark.sql.vectorized.ColumnarBatch object SortUtils extends Arm { @@ -210,6 +211,20 @@ class GpuSorter( } } + private[this] lazy val hasNestedInKeyColumns = cpuOrderingInternal.exists ( order => + DataTypeUtils.isNestedType(order.child.dataType) + ) + + /** (This can be removed once https://github.com/rapidsai/cudf/issues/8050 is addressed) */ + private[this] lazy val hasUnsupportedNestedInRideColumns = { + val keyColumnIndices = cpuOrderingInternal.map(_.child.asInstanceOf[BoundReference].ordinal) + val rideColumnIndices = projectedBatchTypes.indices.toSet -- keyColumnIndices + rideColumnIndices.exists { idx => + TrampolineUtil.dataTypeExistsRecursively(projectedBatchTypes(idx), + t => t.isInstanceOf[ArrayType] || t.isInstanceOf[MapType]) + } + } + /** * Merge multiple batches together. All of these batches should be the output of * `appendProjectedColumns` and the output of this will also be in that same format. @@ -226,15 +241,10 @@ class GpuSorter( batches.foreach { cb => tabs += GpuColumnVector.from(cb) } - // In the current version of cudf merge does not work for structs or lists (nested types) + // In the current version of cudf merge does not work for lists and maps. // This should be fixed by https://github.com/rapidsai/cudf/issues/8050 - val hasNested = { - val tab = tabs.head - (0 until tab.getNumberOfColumns).exists { i => - tab.getColumn(i).getType.isNestedType - } - } - if (hasNested) { + // Nested types in sort key columns is not supported either. + if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) { // so as a work around we concatenate all of the data together and then sort it. // It is slower, but it works withResource(Table.concatenate(tabs: _*)) { concatenated =>