From 0b1c7456ff574fea0598c70a1851fd5f6f97bb6d Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 27 Jan 2021 15:27:19 -0600 Subject: [PATCH] Support TakeOrderedAndProject (#1579) Signed-off-by: Robert (Bobby) Evans --- docs/FAQ.md | 31 +++ docs/configs.md | 1 + docs/supported_ops.md | 31 ++- integration_tests/src/main/python/conftest.py | 2 +- integration_tests/src/main/python/data_gen.py | 8 +- .../src/main/python/sort_test.py | 17 +- .../nvidia/spark/rapids/GpuOverrides.scala | 33 ++- .../spark/rapids/SpillableColumnarBatch.scala | 10 +- .../scala/com/nvidia/spark/rapids/limit.scala | 242 +++++++++++++++++- 9 files changed, 355 insertions(+), 20 deletions(-) diff --git a/docs/FAQ.md b/docs/FAQ.md index 0feb19fdaf6..7f81e09dd47 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -17,6 +17,37 @@ with `collect`, `show` or `write` a new `DataFrame` is constructed causing Spark query. This is why `spark.rapids.sql.enabled` is still respected when running, even if explain shows stale results. +### Why does the plan for the GPU query look different from the CPU query? + +Typically, there is a one to one mapping between CPU stages in a plan and GPU stages. There are a +few places where this is not the case. + +* `WholeStageCodeGen` - The GPU plan typically does not do code generation, and does not support + generating code for an entire stage in the plan. Code generation reduces the cost of processing + data one row at a time. The GPU plan processes the data in a columnar format, so the costs + of processing a batch is amortized over the entire batch of data and code generation is not + needed. + +* `ColumnarToRow` and `RowToColumnar` transitions - The CPU version of Spark plans typically process + data in a row based format. The main exception to this is reading some kinds of columnar data, + like Parquet. Transitioning between the CPU and the GPU also requires transitioning between row + and columnar formatted data. + +* `GpuCoalesceBatches` and `GpuShuffleCoalesce` - Processing data on the GPU scales + sublinearly. That means doubling the data does often takes less than half the time. Because of + this we want to process larger batches of data when possible. These operators will try to combine + smaller batches of data into fewer, larger batches to process more efficiently. + +* `SortMergeJoin` - The RAPIDS accelerator does not support sort merge joins yet. For now, we + translate sort merge joins into shuffled hash joins. Because of this there are times when sorts + may be removed or other sorts added to meet the ordering requirements of the query. + +* `TakeOrderedAndProject` - The `TakeOrderedAndProject` operator will take the top N entries in + each task, shuffle the results to a single executor and then take the top N results from that. + The GPU plan often has more metrics than the CPU versions do, and when we tried to combine all of + these operations into a single stage the metrics were confusing to understand. Instead, we split + the single stage up into multiple smaller parts, so the metrics are clearer. + ### What versions of Apache Spark does the RAPIDS Accelerator for Apache Spark support? The RAPIDS Accelerator for Apache Spark requires version 3.0.0 or 3.0.1 of Apache Spark. Because the diff --git a/docs/configs.md b/docs/configs.md index b499a4d96f8..6ff58712230 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -271,6 +271,7 @@ Name | Description | Default Value | Notes spark.rapids.sql.exec.ProjectExec|The backend for most select, withColumn and dropColumn statements|true|None| spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None| spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None| +spark.rapids.sql.exec.TakeOrderedAndProjectExec|Take the first limit elements as defined by the sortOrder, and do projection if needed.|true|None| spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None| spark.rapids.sql.exec.CustomShuffleReaderExec|A wrapper of shuffle query stage|true|None| spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 9824842169b..0e6d8b7bd93 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -258,8 +258,8 @@ Accelerator supports are described below. S S* S -NS -NS +S* +S NS NS NS @@ -281,8 +281,8 @@ Accelerator supports are described below. S S* S -NS -NS +S* +S NS NS NS @@ -360,6 +360,29 @@ Accelerator supports are described below. NS +TakeOrderedAndProjectExec +Take the first limit elements as defined by the sortOrder, and do projection if needed. +None +S +S +S +S +S +S +S +S +S* +S +S* +S +NS +NS +NS +NS +NS +NS + + UnionExec The backend for the union operator None diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 8339f0bb27c..54a3bcdbf3f 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -371,7 +371,7 @@ def setup(self, spark): } if not self.tpcds_format in formats: raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format)) - formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True) + formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True, True) def do_test_query(self, query): spark = get_spark_i_know_what_i_am_doing() diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 42678406053..4919ae0d904 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -221,13 +221,13 @@ def __init__(self, precision=None, scale=None, nullable=True, special_cases=[]): DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale)) DECIMAL_MAX = Decimal(('9'* precision) + 'e' + str(-scale)) super().__init__(DecimalType(precision, scale), nullable=nullable, special_cases=special_cases) - self._scale = scale - self._precision = precision + self.scale = scale + self.precision = precision pattern = "[0-9]{1,"+ str(precision) + "}e" + str(-scale) self.base_strs = sre_yield.AllStrings(pattern, flags=0, charset=sre_yield.CHARSET, max_count=_MAX_CHOICES) def __repr__(self): - return super().__repr__() + '(' + str(self._precision) + ',' + str(self._scale) + ')' + return super().__repr__() + '(' + str(self.precision) + ',' + str(self.scale) + ')' def start(self, rand): strs = self.base_strs diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 8b2441c43aa..7c1ebd2a81e 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,6 +34,14 @@ def test_single_orderby(data_gen, order): lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = allow_negative_scale_of_decimal_conf) +# SPARK CPU itself has issue with negative scale for take ordered and project +orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] +@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) +@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) +def test_single_orderby_with_limit(data_gen, order): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) + @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_sort_in_part(data_gen, order): @@ -52,3 +60,10 @@ def test_multi_orderby(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()), conf = allow_negative_scale_of_decimal_conf) + +# SPARK CPU itself has issue with negative scale for take ordered and project +orderable_gens_sort_without_neg_decimal = [n for n in orderable_gens_sort if not (isinstance(n, DecimalGen) and n.scale < 0)] +@pytest.mark.parametrize('data_gen', orderable_gens_sort_without_neg_decimal, ids=idfn) +def test_multi_orderby_with_limit(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()).limit(100)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 1f7ebc214fc..f4ef7374eee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2355,13 +2355,38 @@ object GpuOverrides { GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(), childPlans.head.convertIfNeeded()) }), + exec[TakeOrderedAndProjectExec]( + "Take the first limit elements as defined by the sortOrder, and do projection if needed.", + ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all), + (takeExec, conf, p, r) => + new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { + val sortOrder: Seq[BaseExprMeta[SortOrder]] = + takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val projectList: Seq[BaseExprMeta[NamedExpression]] = + takeExec.projectList.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList + + override def convertToGpu(): GpuExec = { + // To avoid metrics confusion we split a single stage up into multiple parts + val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder]) + GpuTopN(takeExec.limit, + so, + projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]), + ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty), + GpuTopN(takeExec.limit, + so, + takeExec.child.output, + childPlans.head.convertIfNeeded()))) + } + }), exec[LocalLimitExec]( "Per-partition limiting of results", - ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, + TypeSig.all), (localLimitExec, conf, p, r) => new SparkPlanMeta[LocalLimitExec](localLimitExec, conf, p, r) { override def convertToGpu(): GpuExec = - GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded()) + GpuLocalLimitExec(localLimitExec.limit, childPlans.head.convertIfNeeded()) }), exec[ArrowEvalPythonExec]( "The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" + @@ -2388,11 +2413,11 @@ object GpuOverrides { }), exec[GlobalLimitExec]( "Limiting of results across partitions", - ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), (globalLimitExec, conf, p, r) => new SparkPlanMeta[GlobalLimitExec](globalLimitExec, conf, p, r) { override def convertToGpu(): GpuExec = - GpuGlobalLimitExec(globalLimitExec.limit, childPlans(0).convertIfNeeded()) + GpuGlobalLimitExec(globalLimitExec.limit, childPlans.head.convertIfNeeded()) }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 9cf66939c32..4b2cb5b72d7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,8 @@ trait SpillableColumnarBatch extends AutoCloseable { * with decompressing the data if necessary. */ def getColumnarBatch(): ColumnarBatch + + def sizeInBytes: Long } /** @@ -57,6 +59,7 @@ class JustRowsColumnarBatch(numRows: Int) extends SpillableColumnarBatch { override def getColumnarBatch(): ColumnarBatch = new ColumnarBatch(Array.empty, numRows) override def close(): Unit = () // NOOP nothing to close + override val sizeInBytes: Long = 0L } /** @@ -82,6 +85,11 @@ class SpillableColumnarBatchImpl (id: TempSpillBufferId, */ def spillId: TempSpillBufferId = id + override lazy val sizeInBytes: Long = + withResource(RapidsBufferCatalog.acquireBuffer(id)) { buff => + buff.size + } + /** * Set a new spill priority. */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala index c3ceb462e74..17101e1c448 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,15 +19,19 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{NvtxColor, Table} +import ai.rapids.cudf import com.nvidia.spark.rapids.GpuMetricNames._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning} -import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning, SinglePartition} +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} /** * Helper trait which defines methods that are shared by both @@ -137,4 +141,232 @@ class GpuCollectLimitMeta( ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty), GpuLocalLimitExec(collectLimit.limit, childPlans(0).convertIfNeeded()))) +} + +object GpuTopN extends Arm { + private[this] def getOrderArgs( + sortOrder: Seq[SortOrder], + inputTbl: Table): Seq[Table.OrderByArg] = { + val orderByArgs = if (sortOrder.nonEmpty) { + sortOrder.zipWithIndex.map { case (order, index) => + if (order.isAscending) { + Table.asc(index, order.nullOrdering == NullsFirst) + } else { + Table.desc(index, order.nullOrdering == NullsLast) + } + } + } else { + (0 until inputTbl.getNumberOfColumns).map { index => + Table.asc(index, true) + } + } + orderByArgs + } + + private def doGpuSort( + inputTbl: Table, + sortOrder: Seq[SortOrder], + types: Seq[DataType]): ColumnarBatch = { + val orderByArgs = getOrderArgs(sortOrder, inputTbl) + val numSortCols = sortOrder.length + withResource(inputTbl.orderBy(orderByArgs: _*)) { resultTbl => + GpuColumnVector.from(resultTbl, types.toArray, numSortCols, resultTbl.getNumberOfColumns) + } + } + + private[this] def sortBatch( + sortOrder: Seq[SortOrder], + inputBatch: ColumnarBatch, + sortTime: SQLMetric): ColumnarBatch = { + withResource(new NvtxWithMetrics("sort", NvtxColor.DARK_GREEN, sortTime)) { _ => + var outputTypes: Seq[DataType] = Nil + var inputTbl: Table = null + var inputCvs: Seq[GpuColumnVector] = Nil + try { + if (sortOrder.nonEmpty) { + inputCvs = SortUtils.getGpuColVectorsAndBindReferences(inputBatch, sortOrder) + inputTbl = new cudf.Table(inputCvs.map(_.getBase): _*) + outputTypes = sortOrder.map(_.child.dataType) ++ + GpuColumnVector.extractTypes(inputBatch) + } else if (inputBatch.numCols() > 0) { + inputTbl = GpuColumnVector.from(inputBatch) + outputTypes = GpuColumnVector.extractTypes(inputBatch) + } + doGpuSort(inputTbl, sortOrder, outputTypes) + } finally { + inputCvs.safeClose() + if (inputTbl != null) { + inputTbl.close() + } + } + } + } + + private[this] def concatAndClose(a: SpillableColumnarBatch, + b: ColumnarBatch, + concatTime: SQLMetric): ColumnarBatch = { + withResource(new NvtxWithMetrics("readNConcat", NvtxColor.CYAN, concatTime)) { _ => + val dataTypes = GpuColumnVector.extractTypes(b) + val aTable = withResource(a) { a => + withResource(a.getColumnarBatch()) { aBatch => + GpuColumnVector.from(aBatch) + } + } + val ret = withResource(aTable) { aTable => + withResource(b) { b => + withResource(GpuColumnVector.from(b)) { bTable => + Table.concatenate(aTable, bTable) + } + } + } + withResource(ret) { ret => + GpuColumnVector.from(ret, dataTypes) + } + } + } + + private[this] def takeN(batch: ColumnarBatch, count: Int): ColumnarBatch = { + val end = Math.min(count, batch.numRows()) + val numColumns = batch.numCols() + closeOnExcept(new Array[ColumnVector](numColumns)) { columns => + val bases = GpuColumnVector.extractBases(batch) + (0 until numColumns).foreach { i => + columns(i) = GpuColumnVector.from(bases(i).subVector(0, end), batch.column(i).dataType()) + } + new ColumnarBatch(columns, end) + } + } + + def apply(limit: Int, + sortOrder: Seq[SortOrder], + batch: ColumnarBatch, + sortTime: SQLMetric): ColumnarBatch = { + withResource(sortBatch(sortOrder, batch, sortTime)) { sorted => + takeN(sorted, limit) + } + } + + def apply(limit: Int, + sortOrder: Seq[SortOrder], + iter: Iterator[ColumnarBatch], + totalTime: SQLMetric, + sortTime: SQLMetric, + concatTime: SQLMetric, + batchTime: SQLMetric, + inputBatches: SQLMetric, + inputRows: SQLMetric, + outputBatches: SQLMetric, + outputRows: SQLMetric): Iterator[ColumnarBatch] = + new Iterator[ColumnarBatch]() { + override def hasNext: Boolean = iter.hasNext + + private[this] var pending: Option[SpillableColumnarBatch] = None + + override def next(): ColumnarBatch = { + while (iter.hasNext) { + val input = iter.next() + withResource(new NvtxWithMetrics("TOP N", NvtxColor.ORANGE, totalTime)) { _ => + inputBatches += 1 + inputRows += input.numRows() + lazy val totalSize = GpuColumnVector.getTotalDeviceMemoryUsed(input) + + pending.map(_.sizeInBytes).getOrElse(0L) + + val runningResult = if (pending.isEmpty) { + withResource(input) { input => + apply(limit, sortOrder, input, sortTime) + } + } else if (totalSize > Integer.MAX_VALUE) { + // The intermediate size is likely big enough we don't want to risk an overflow, + // so sort/slice before we concat and sort/slice again. + val tmp = withResource(input) { input => + apply(limit, sortOrder, input, sortTime) + } + withResource(concatAndClose(pending.get, tmp, concatTime)) { concat => + apply(limit, sortOrder, concat, sortTime) + } + } else { + // The intermediate size looks like we could never overflow the indexes so + // do it the more efficient way and concat first followed by the sort/slice + withResource(concatAndClose(pending.get, input, concatTime)) { concat => + apply(limit, sortOrder, concat, sortTime) + } + } + pending = withResource(new NvtxWithMetrics("make batch", + NvtxColor.RED, batchTime)) { _ => + Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + } + } + } + val ret = pending.get.getColumnarBatch() + pending.get.close() + pending = None + outputBatches += 1 + outputRows += ret.numRows() + ret + } + } +} + +/** + * Take the first limit elements as defined by the sortOrder, and do projection if needed. + * This is logically equivalent to having a Limit operator after a `SortExec` operator, + * or having a `ProjectExec` operator between them. + * This could have been named TopK, but Spark's top operator does the opposite in ordering + * so we name it TakeOrdered to avoid confusion. + */ +case class GpuTopN( + limit: Int, + sortOrder: Seq[SortOrder], + projectList: Seq[NamedExpression], + child: SparkPlan) extends GpuExec with UnaryExecNode { + + override def output: Seq[Attribute] = { + projectList.map(_.toAttribute) + } + + override lazy val additionalMetrics: Map[String, SQLMetric] = Map( + NUM_INPUT_ROWS -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_ROWS), + NUM_INPUT_BATCHES -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_BATCHES), + "sortTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "sort time"), + "concatTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "concat time"), + "batchTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "batch time") + ) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val boundSortExprs = GpuBindReferences.bindReferences(sortOrder, child.output) + val boundProjectExprs = GpuBindReferences.bindGpuReferences(projectList, child.output) + val totalTime = metrics(TOTAL_TIME) + val inputBatches = metrics(NUM_INPUT_BATCHES) + val inputRows = metrics(NUM_INPUT_ROWS) + val outputBatches = metrics(NUM_OUTPUT_BATCHES) + val outputRows = metrics(NUM_OUTPUT_ROWS) + val sortTime = metrics("sortTime") + val concatTime = metrics("concatTime") + val batchTime = metrics("batchTime") + child.executeColumnar().mapPartitions { iter => + val topN = GpuTopN(limit, boundSortExprs, iter, totalTime, sortTime, concatTime, batchTime, + inputBatches, inputRows, outputBatches, outputRows) + if (projectList != child.output) { + topN.map { batch => + GpuProjectExec.projectAndClose(batch, boundProjectExprs, totalTime) + } + } else { + topN + } + } + } + + protected override def doExecute(): RDD[InternalRow] = + throw new IllegalStateException(s"Row-based execution should not occur for $this") + + override def outputOrdering: Seq[SortOrder] = sortOrder + + override def outputPartitioning: Partitioning = SinglePartition + + override def simpleString(maxFields: Int): String = { + val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields) + val outputString = truncatedString(output, "[", ",", "]", maxFields) + + s"GpuTopN(limit=$limit, orderBy=$orderByString, output=$outputString)" + } } \ No newline at end of file