diff --git a/docs/FAQ.md b/docs/FAQ.md
index 0feb19fdaf6..7f81e09dd47 100644
--- a/docs/FAQ.md
+++ b/docs/FAQ.md
@@ -17,6 +17,37 @@ with `collect`, `show` or `write` a new `DataFrame` is constructed causing Spark
query. This is why `spark.rapids.sql.enabled` is still respected when running, even if explain shows
stale results.
+### Why does the plan for the GPU query look different from the CPU query?
+
+Typically, there is a one to one mapping between CPU stages in a plan and GPU stages. There are a
+few places where this is not the case.
+
+* `WholeStageCodeGen` - The GPU plan typically does not do code generation, and does not support
+ generating code for an entire stage in the plan. Code generation reduces the cost of processing
+ data one row at a time. The GPU plan processes the data in a columnar format, so the costs
+ of processing a batch is amortized over the entire batch of data and code generation is not
+ needed.
+
+* `ColumnarToRow` and `RowToColumnar` transitions - The CPU version of Spark plans typically process
+ data in a row based format. The main exception to this is reading some kinds of columnar data,
+ like Parquet. Transitioning between the CPU and the GPU also requires transitioning between row
+ and columnar formatted data.
+
+* `GpuCoalesceBatches` and `GpuShuffleCoalesce` - Processing data on the GPU scales
+ sublinearly. That means doubling the data does often takes less than half the time. Because of
+ this we want to process larger batches of data when possible. These operators will try to combine
+ smaller batches of data into fewer, larger batches to process more efficiently.
+
+* `SortMergeJoin` - The RAPIDS accelerator does not support sort merge joins yet. For now, we
+ translate sort merge joins into shuffled hash joins. Because of this there are times when sorts
+ may be removed or other sorts added to meet the ordering requirements of the query.
+
+* `TakeOrderedAndProject` - The `TakeOrderedAndProject` operator will take the top N entries in
+ each task, shuffle the results to a single executor and then take the top N results from that.
+ The GPU plan often has more metrics than the CPU versions do, and when we tried to combine all of
+ these operations into a single stage the metrics were confusing to understand. Instead, we split
+ the single stage up into multiple smaller parts, so the metrics are clearer.
+
### What versions of Apache Spark does the RAPIDS Accelerator for Apache Spark support?
The RAPIDS Accelerator for Apache Spark requires version 3.0.0 or 3.0.1 of Apache Spark. Because the
diff --git a/docs/configs.md b/docs/configs.md
index b499a4d96f8..6ff58712230 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -271,6 +271,7 @@ Name | Description | Default Value | Notes
spark.rapids.sql.exec.ProjectExec|The backend for most select, withColumn and dropColumn statements|true|None|
spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None|
spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None|
+spark.rapids.sql.exec.TakeOrderedAndProjectExec|Take the first limit elements as defined by the sortOrder, and do projection if needed.|true|None|
spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
spark.rapids.sql.exec.CustomShuffleReaderExec|A wrapper of shuffle query stage|true|None|
spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 511a3611803..855699fa191 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -258,8 +258,8 @@ Accelerator supports are described below.
S |
S* |
S |
-NS |
-NS |
+S* |
+S |
NS |
NS |
NS |
@@ -281,8 +281,8 @@ Accelerator supports are described below.
S |
S* |
S |
-NS |
-NS |
+S* |
+S |
NS |
NS |
NS |
@@ -360,6 +360,29 @@ Accelerator supports are described below.
NS |
+TakeOrderedAndProjectExec |
+Take the first limit elements as defined by the sortOrder, and do projection if needed. |
+None |
+S |
+S |
+S |
+S |
+S |
+S |
+S |
+S |
+S* |
+S |
+S* |
+S |
+NS |
+NS |
+NS |
+NS |
+NS |
+NS |
+
+
UnionExec |
The backend for the union operator |
None |
diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py
index 8339f0bb27c..54a3bcdbf3f 100644
--- a/integration_tests/src/main/python/conftest.py
+++ b/integration_tests/src/main/python/conftest.py
@@ -371,7 +371,7 @@ def setup(self, spark):
}
if not self.tpcds_format in formats:
raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format))
- formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True)
+ formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True, True)
def do_test_query(self, query):
spark = get_spark_i_know_what_i_am_doing()
diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py
index 42678406053..4919ae0d904 100644
--- a/integration_tests/src/main/python/data_gen.py
+++ b/integration_tests/src/main/python/data_gen.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2020, NVIDIA CORPORATION.
+# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -221,13 +221,13 @@ def __init__(self, precision=None, scale=None, nullable=True, special_cases=[]):
DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale))
DECIMAL_MAX = Decimal(('9'* precision) + 'e' + str(-scale))
super().__init__(DecimalType(precision, scale), nullable=nullable, special_cases=special_cases)
- self._scale = scale
- self._precision = precision
+ self.scale = scale
+ self.precision = precision
pattern = "[0-9]{1,"+ str(precision) + "}e" + str(-scale)
self.base_strs = sre_yield.AllStrings(pattern, flags=0, charset=sre_yield.CHARSET, max_count=_MAX_CHOICES)
def __repr__(self):
- return super().__repr__() + '(' + str(self._precision) + ',' + str(self._scale) + ')'
+ return super().__repr__() + '(' + str(self.precision) + ',' + str(self.scale) + ')'
def start(self, rand):
strs = self.base_strs
diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py
index 8b2441c43aa..7c1ebd2a81e 100644
--- a/integration_tests/src/main/python/sort_test.py
+++ b/integration_tests/src/main/python/sort_test.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2020, NVIDIA CORPORATION.
+# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -34,6 +34,14 @@ def test_single_orderby(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = allow_negative_scale_of_decimal_conf)
+# SPARK CPU itself has issue with negative scale for take ordered and project
+orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
+@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
+@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
+def test_single_orderby_with_limit(data_gen, order):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100))
+
@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_sort_in_part(data_gen, order):
@@ -52,3 +60,10 @@ def test_multi_orderby(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()),
conf = allow_negative_scale_of_decimal_conf)
+
+# SPARK CPU itself has issue with negative scale for take ordered and project
+orderable_gens_sort_without_neg_decimal = [n for n in orderable_gens_sort if not (isinstance(n, DecimalGen) and n.scale < 0)]
+@pytest.mark.parametrize('data_gen', orderable_gens_sort_without_neg_decimal, ids=idfn)
+def test_multi_orderby_with_limit(data_gen):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()).limit(100))
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index a111a8e6993..26885ec6af4 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -2355,13 +2355,38 @@ object GpuOverrides {
GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(),
childPlans.head.convertIfNeeded())
}),
+ exec[TakeOrderedAndProjectExec](
+ "Take the first limit elements as defined by the sortOrder, and do projection if needed.",
+ ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all),
+ (takeExec, conf, p, r) =>
+ new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
+ val sortOrder: Seq[BaseExprMeta[SortOrder]] =
+ takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
+ val projectList: Seq[BaseExprMeta[NamedExpression]] =
+ takeExec.projectList.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
+ override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList
+
+ override def convertToGpu(): GpuExec = {
+ // To avoid metrics confusion we split a single stage up into multiple parts
+ val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
+ GpuTopN(takeExec.limit,
+ so,
+ projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
+ ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty),
+ GpuTopN(takeExec.limit,
+ so,
+ takeExec.child.output,
+ childPlans.head.convertIfNeeded())))
+ }
+ }),
exec[LocalLimitExec](
"Per-partition limiting of results",
- ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
+ ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL,
+ TypeSig.all),
(localLimitExec, conf, p, r) =>
new SparkPlanMeta[LocalLimitExec](localLimitExec, conf, p, r) {
override def convertToGpu(): GpuExec =
- GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded())
+ GpuLocalLimitExec(localLimitExec.limit, childPlans.head.convertIfNeeded())
}),
exec[ArrowEvalPythonExec](
"The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" +
@@ -2388,11 +2413,11 @@ object GpuOverrides {
}),
exec[GlobalLimitExec](
"Limiting of results across partitions",
- ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
+ ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(globalLimitExec, conf, p, r) =>
new SparkPlanMeta[GlobalLimitExec](globalLimitExec, conf, p, r) {
override def convertToGpu(): GpuExec =
- GpuGlobalLimitExec(globalLimitExec.limit, childPlans(0).convertIfNeeded())
+ GpuGlobalLimitExec(globalLimitExec.limit, childPlans.head.convertIfNeeded())
}),
exec[CollectLimitExec](
"Reduce to single partition and apply limit",
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala
index 9cf66939c32..4b2cb5b72d7 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,6 +44,8 @@ trait SpillableColumnarBatch extends AutoCloseable {
* with decompressing the data if necessary.
*/
def getColumnarBatch(): ColumnarBatch
+
+ def sizeInBytes: Long
}
/**
@@ -57,6 +59,7 @@ class JustRowsColumnarBatch(numRows: Int) extends SpillableColumnarBatch {
override def getColumnarBatch(): ColumnarBatch =
new ColumnarBatch(Array.empty, numRows)
override def close(): Unit = () // NOOP nothing to close
+ override val sizeInBytes: Long = 0L
}
/**
@@ -82,6 +85,11 @@ class SpillableColumnarBatchImpl (id: TempSpillBufferId,
*/
def spillId: TempSpillBufferId = id
+ override lazy val sizeInBytes: Long =
+ withResource(RapidsBufferCatalog.acquireBuffer(id)) { buff =>
+ buff.size
+ }
+
/**
* Set a new spill priority.
*/
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
index c3ceb462e74..17101e1c448 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,15 +19,19 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{NvtxColor, Table}
+import ai.rapids.cudf
import com.nvidia.spark.rapids.GpuMetricNames._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
-import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
-import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, NullsFirst, NullsLast, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
/**
* Helper trait which defines methods that are shared by both
@@ -137,4 +141,232 @@ class GpuCollectLimitMeta(
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty),
GpuLocalLimitExec(collectLimit.limit, childPlans(0).convertIfNeeded())))
+}
+
+object GpuTopN extends Arm {
+ private[this] def getOrderArgs(
+ sortOrder: Seq[SortOrder],
+ inputTbl: Table): Seq[Table.OrderByArg] = {
+ val orderByArgs = if (sortOrder.nonEmpty) {
+ sortOrder.zipWithIndex.map { case (order, index) =>
+ if (order.isAscending) {
+ Table.asc(index, order.nullOrdering == NullsFirst)
+ } else {
+ Table.desc(index, order.nullOrdering == NullsLast)
+ }
+ }
+ } else {
+ (0 until inputTbl.getNumberOfColumns).map { index =>
+ Table.asc(index, true)
+ }
+ }
+ orderByArgs
+ }
+
+ private def doGpuSort(
+ inputTbl: Table,
+ sortOrder: Seq[SortOrder],
+ types: Seq[DataType]): ColumnarBatch = {
+ val orderByArgs = getOrderArgs(sortOrder, inputTbl)
+ val numSortCols = sortOrder.length
+ withResource(inputTbl.orderBy(orderByArgs: _*)) { resultTbl =>
+ GpuColumnVector.from(resultTbl, types.toArray, numSortCols, resultTbl.getNumberOfColumns)
+ }
+ }
+
+ private[this] def sortBatch(
+ sortOrder: Seq[SortOrder],
+ inputBatch: ColumnarBatch,
+ sortTime: SQLMetric): ColumnarBatch = {
+ withResource(new NvtxWithMetrics("sort", NvtxColor.DARK_GREEN, sortTime)) { _ =>
+ var outputTypes: Seq[DataType] = Nil
+ var inputTbl: Table = null
+ var inputCvs: Seq[GpuColumnVector] = Nil
+ try {
+ if (sortOrder.nonEmpty) {
+ inputCvs = SortUtils.getGpuColVectorsAndBindReferences(inputBatch, sortOrder)
+ inputTbl = new cudf.Table(inputCvs.map(_.getBase): _*)
+ outputTypes = sortOrder.map(_.child.dataType) ++
+ GpuColumnVector.extractTypes(inputBatch)
+ } else if (inputBatch.numCols() > 0) {
+ inputTbl = GpuColumnVector.from(inputBatch)
+ outputTypes = GpuColumnVector.extractTypes(inputBatch)
+ }
+ doGpuSort(inputTbl, sortOrder, outputTypes)
+ } finally {
+ inputCvs.safeClose()
+ if (inputTbl != null) {
+ inputTbl.close()
+ }
+ }
+ }
+ }
+
+ private[this] def concatAndClose(a: SpillableColumnarBatch,
+ b: ColumnarBatch,
+ concatTime: SQLMetric): ColumnarBatch = {
+ withResource(new NvtxWithMetrics("readNConcat", NvtxColor.CYAN, concatTime)) { _ =>
+ val dataTypes = GpuColumnVector.extractTypes(b)
+ val aTable = withResource(a) { a =>
+ withResource(a.getColumnarBatch()) { aBatch =>
+ GpuColumnVector.from(aBatch)
+ }
+ }
+ val ret = withResource(aTable) { aTable =>
+ withResource(b) { b =>
+ withResource(GpuColumnVector.from(b)) { bTable =>
+ Table.concatenate(aTable, bTable)
+ }
+ }
+ }
+ withResource(ret) { ret =>
+ GpuColumnVector.from(ret, dataTypes)
+ }
+ }
+ }
+
+ private[this] def takeN(batch: ColumnarBatch, count: Int): ColumnarBatch = {
+ val end = Math.min(count, batch.numRows())
+ val numColumns = batch.numCols()
+ closeOnExcept(new Array[ColumnVector](numColumns)) { columns =>
+ val bases = GpuColumnVector.extractBases(batch)
+ (0 until numColumns).foreach { i =>
+ columns(i) = GpuColumnVector.from(bases(i).subVector(0, end), batch.column(i).dataType())
+ }
+ new ColumnarBatch(columns, end)
+ }
+ }
+
+ def apply(limit: Int,
+ sortOrder: Seq[SortOrder],
+ batch: ColumnarBatch,
+ sortTime: SQLMetric): ColumnarBatch = {
+ withResource(sortBatch(sortOrder, batch, sortTime)) { sorted =>
+ takeN(sorted, limit)
+ }
+ }
+
+ def apply(limit: Int,
+ sortOrder: Seq[SortOrder],
+ iter: Iterator[ColumnarBatch],
+ totalTime: SQLMetric,
+ sortTime: SQLMetric,
+ concatTime: SQLMetric,
+ batchTime: SQLMetric,
+ inputBatches: SQLMetric,
+ inputRows: SQLMetric,
+ outputBatches: SQLMetric,
+ outputRows: SQLMetric): Iterator[ColumnarBatch] =
+ new Iterator[ColumnarBatch]() {
+ override def hasNext: Boolean = iter.hasNext
+
+ private[this] var pending: Option[SpillableColumnarBatch] = None
+
+ override def next(): ColumnarBatch = {
+ while (iter.hasNext) {
+ val input = iter.next()
+ withResource(new NvtxWithMetrics("TOP N", NvtxColor.ORANGE, totalTime)) { _ =>
+ inputBatches += 1
+ inputRows += input.numRows()
+ lazy val totalSize = GpuColumnVector.getTotalDeviceMemoryUsed(input) +
+ pending.map(_.sizeInBytes).getOrElse(0L)
+
+ val runningResult = if (pending.isEmpty) {
+ withResource(input) { input =>
+ apply(limit, sortOrder, input, sortTime)
+ }
+ } else if (totalSize > Integer.MAX_VALUE) {
+ // The intermediate size is likely big enough we don't want to risk an overflow,
+ // so sort/slice before we concat and sort/slice again.
+ val tmp = withResource(input) { input =>
+ apply(limit, sortOrder, input, sortTime)
+ }
+ withResource(concatAndClose(pending.get, tmp, concatTime)) { concat =>
+ apply(limit, sortOrder, concat, sortTime)
+ }
+ } else {
+ // The intermediate size looks like we could never overflow the indexes so
+ // do it the more efficient way and concat first followed by the sort/slice
+ withResource(concatAndClose(pending.get, input, concatTime)) { concat =>
+ apply(limit, sortOrder, concat, sortTime)
+ }
+ }
+ pending = withResource(new NvtxWithMetrics("make batch",
+ NvtxColor.RED, batchTime)) { _ =>
+ Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
+ }
+ }
+ }
+ val ret = pending.get.getColumnarBatch()
+ pending.get.close()
+ pending = None
+ outputBatches += 1
+ outputRows += ret.numRows()
+ ret
+ }
+ }
+}
+
+/**
+ * Take the first limit elements as defined by the sortOrder, and do projection if needed.
+ * This is logically equivalent to having a Limit operator after a `SortExec` operator,
+ * or having a `ProjectExec` operator between them.
+ * This could have been named TopK, but Spark's top operator does the opposite in ordering
+ * so we name it TakeOrdered to avoid confusion.
+ */
+case class GpuTopN(
+ limit: Int,
+ sortOrder: Seq[SortOrder],
+ projectList: Seq[NamedExpression],
+ child: SparkPlan) extends GpuExec with UnaryExecNode {
+
+ override def output: Seq[Attribute] = {
+ projectList.map(_.toAttribute)
+ }
+
+ override lazy val additionalMetrics: Map[String, SQLMetric] = Map(
+ NUM_INPUT_ROWS -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_ROWS),
+ NUM_INPUT_BATCHES -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_BATCHES),
+ "sortTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "sort time"),
+ "concatTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "concat time"),
+ "batchTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "batch time")
+ )
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val boundSortExprs = GpuBindReferences.bindReferences(sortOrder, child.output)
+ val boundProjectExprs = GpuBindReferences.bindGpuReferences(projectList, child.output)
+ val totalTime = metrics(TOTAL_TIME)
+ val inputBatches = metrics(NUM_INPUT_BATCHES)
+ val inputRows = metrics(NUM_INPUT_ROWS)
+ val outputBatches = metrics(NUM_OUTPUT_BATCHES)
+ val outputRows = metrics(NUM_OUTPUT_ROWS)
+ val sortTime = metrics("sortTime")
+ val concatTime = metrics("concatTime")
+ val batchTime = metrics("batchTime")
+ child.executeColumnar().mapPartitions { iter =>
+ val topN = GpuTopN(limit, boundSortExprs, iter, totalTime, sortTime, concatTime, batchTime,
+ inputBatches, inputRows, outputBatches, outputRows)
+ if (projectList != child.output) {
+ topN.map { batch =>
+ GpuProjectExec.projectAndClose(batch, boundProjectExprs, totalTime)
+ }
+ } else {
+ topN
+ }
+ }
+ }
+
+ protected override def doExecute(): RDD[InternalRow] =
+ throw new IllegalStateException(s"Row-based execution should not occur for $this")
+
+ override def outputOrdering: Seq[SortOrder] = sortOrder
+
+ override def outputPartitioning: Partitioning = SinglePartition
+
+ override def simpleString(maxFields: Int): String = {
+ val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields)
+ val outputString = truncatedString(output, "[", ",", "]", maxFields)
+
+ s"GpuTopN(limit=$limit, orderBy=$orderByString, output=$outputString)"
+ }
}
\ No newline at end of file