From 4687e2c9d36c2f638cc41c482b1094f7c8d94684 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 22 Feb 2021 08:28:43 -0600 Subject: [PATCH] Add in out of core sort (#1719) Signed-off-by: Robert (Bobby) Evans --- docs/compatibility.md | 13 + docs/configs.md | 1 + .../src/main/python/hash_aggregate_test.py | 7 +- .../nvidia/spark/rapids/GpuColumnVector.java | 145 ++++- .../scala/com/nvidia/spark/rapids/Arm.scala | 11 +- .../com/nvidia/spark/rapids/GpuExec.scala | 40 +- .../nvidia/spark/rapids/GpuExpressions.scala | 10 +- .../nvidia/spark/rapids/GpuGenerateExec.scala | 5 +- .../spark/rapids/GpuRangePartitioning.scala | 9 +- .../com/nvidia/spark/rapids/GpuSortExec.scala | 574 +++++++++++++----- .../spark/rapids/GpuTransitionOverrides.scala | 4 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 24 + .../spark/rapids/RapidsBufferCatalog.scala | 23 +- .../spark/rapids/RapidsBufferStore.scala | 9 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 13 + .../rapids/RapidsDeviceMemoryStore.scala | 30 +- .../nvidia/spark/rapids/RapidsDiskStore.scala | 10 +- .../nvidia/spark/rapids/RapidsGdsStore.scala | 11 +- .../spark/rapids/RapidsHostMemoryStore.scala | 11 +- .../com/nvidia/spark/rapids/SortUtils.scala | 284 ++++++++- .../spark/rapids/SpillableColumnarBatch.scala | 28 +- .../com/nvidia/spark/rapids/aggregate.scala | 4 +- .../spark/rapids/basicPhysicalOperators.scala | 3 + .../scala/com/nvidia/spark/rapids/limit.scala | 84 +-- .../sql/rapids/GpuFileFormatWriter.scala | 10 +- .../execution/GpuShuffleExchangeExec.scala | 13 +- .../sql/rapids/execution/TrampolineUtil.scala | 3 + .../rapids/GpuCoalesceBatchesSuite.scala | 6 +- .../spark/rapids/HashSortOptimizeSuite.scala | 5 +- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 4 +- .../shuffle/RapidsShuffleClientSuite.scala | 8 +- .../shuffle/RapidsShuffleTestHelper.scala | 2 +- .../rapids/SpillableColumnarBatchSuite.scala | 1 + 33 files changed, 1071 insertions(+), 334 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index e4a4a53479e..76601bba2b0 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -27,6 +27,19 @@ Spark's guarantee. It may not be 100% identical if the ordering is ambiguous. In versions of Spark prior to 3.1.0 `-0.0` is always < `0.0` but in 3.1.0 and above this is not true for sorting. For all versions of the plugin `-0.0` == `0.0` for sorting. +Spark's sorting is typically a [stable](https://en.wikipedia.org/wiki/Sorting_algorithm#Stability) +sort. Sort stability cannot be guaranteed in distributed work loads because the order in which +upstream data arrives to a task is not guaranteed. Sort stability is only +guaranteed in one situation which is reading and sorting data from a file using a single +task/partition. The RAPIDS Accelerator does an unstable +[out of core](https://en.wikipedia.org/wiki/External_memory_algorithm) sort by default. This +simply means that the sort algorithm allows for spilling parts of the data if it is larger than +can fit in the GPU's memory, but it does not guarantee ordering of rows when the ordering of the +keys is ambiguous. If you do rely on a stable sort in your processing you can request this by +setting [spark.rapids.sql.stableSort.enabled](configs.md#sql.stableSort.enabled) to `true` and +RAPIDS will try to sort all the data for a given task/partition at once on the GPU. This may change +in the future to allow for a spillable stable sort. + ## Floating Point For most basic floating-point operations like addition, subtraction, multiplication, and division diff --git a/docs/configs.md b/docs/configs.md index cbcb52b29d6..528b684424b 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -83,6 +83,7 @@ Name | Description | Default Value spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647 spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true spark.rapids.sql.shuffle.spillThreads|Number of threads used to spill shuffle data to disk in the background.|6 +spark.rapids.sql.stableSort.enabled|Enable or disable stable sorting. Apache Spark's sorting is typically a stable sort, but sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability then only matters when reading and sorting data from a file using a single task/partition. Because of limitations in the plugin when you enable stable sorting all of the data for a single task will be combined into a single batch before sorting. This currently disables spilling from GPU memory if the data size is too large.|false spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|false diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 161df03f251..fd8deda76e9 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -384,9 +384,8 @@ def test_generic_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic - lambda spark : binary_op_df(spark, data_gen)\ - .coalesce(1)\ - .sortWithinPartitions('b').selectExpr( + lambda spark : unary_op_df(spark, data_gen)\ + .coalesce(1).selectExpr( 'min(a)', 'max(a)', 'first(a)', diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 009c5da6c1a..46fc9c794c4 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -29,7 +29,9 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; /** @@ -69,6 +71,14 @@ public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector c } } + private static String hexString(byte[] bytes) { + StringBuilder str = new StringBuilder(); + for (byte b : bytes) { + str.append(String.format("%02x", b&0xff)); + } + return str.toString(); + } + /** * Print to standard error the contents of a column. Note that this should never be * called from production code, as it is very slow. Also note that this is not production @@ -88,6 +98,48 @@ public static synchronized void debug(String name, HostColumnVector hostCol) { System.err.println(i + " " + hostCol.getBigDecimal(i)); } } + } else if (DType.STRING.equals(type)) { + for (int i = 0; i < hostCol.getRowCount(); i++) { + if (hostCol.isNull(i)) { + System.err.println(i + " NULL"); + } else { + System.err.println(i + " \"" + hostCol.getJavaString(i) + "\" " + + hexString(hostCol.getUTF8(i))); + } + } + } else if (DType.INT32.equals(type)) { + for (int i = 0; i < hostCol.getRowCount(); i++) { + if (hostCol.isNull(i)) { + System.err.println(i + " NULL"); + } else { + System.err.println(i + " " + hostCol.getInt(i)); + } + } + } else if (DType.INT8.equals(type)) { + for (int i = 0; i < hostCol.getRowCount(); i++) { + if (hostCol.isNull(i)) { + System.err.println(i + " NULL"); + } else { + System.err.println(i + " " + hostCol.getByte(i)); + } + } + } else if (DType.BOOL8.equals(type)) { + for (int i = 0; i < hostCol.getRowCount(); i++) { + if (hostCol.isNull(i)) { + System.err.println(i + " NULL"); + } else { + System.err.println(i + " " + hostCol.getBoolean(i)); + } + } + } else if (DType.TIMESTAMP_MICROSECONDS.equals(type) || + DType.INT64.equals(type)) { + for (int i = 0; i < hostCol.getRowCount(); i++) { + if (hostCol.isNull(i)) { + System.err.println(i + " NULL"); + } else { + System.err.println(i + " " + hostCol.getLong(i)); + } + } } else { System.err.println("TYPE " + type + " NOT SUPPORTED FOR DEBUG PRINT"); } @@ -520,6 +572,17 @@ private static boolean typeConversionAllowed(ColumnView cv, DataType colType) { } } + static boolean typeConversionAllowed(Table table, DataType[] colTypes, int startCol, int endCol) { + final int numColumns = endCol - startCol; + assert numColumns == colTypes.length: "The number of columns and the number of types don't match"; + boolean ret = true; + for (int colIndex = startCol; colIndex < endCol; colIndex++) { + boolean t = typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex - startCol]); + ret = ret && t; + } + return ret; + } + /** * This should only ever be called from an assertion. This is to avoid the performance overhead * of doing the complicated check in production. Sadly this means that we don't get to give a @@ -528,9 +591,7 @@ private static boolean typeConversionAllowed(ColumnView cv, DataType colType) { */ static boolean typeConversionAllowed(Table table, DataType[] colTypes) { final int numColumns = table.getNumberOfColumns(); - if (numColumns != colTypes.length) { - return false; - } + assert numColumns == colTypes.length: "The number of columns and the number of types don't match"; boolean ret = true; for (int colIndex = 0; colIndex < numColumns; colIndex++) { ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]); @@ -545,22 +606,24 @@ static boolean typeConversionAllowed(Table table, DataType[] colTypes) { * both the table that is passed in and the batch returned to be sure that there are no leaks. * * @param table a table of vectors - * @param colTypes List of the column data types in the table passed in + * @param colTypes List of the column data types for the returned batch. It matches startColIndex + * to untilColIndex instead of everything in table. * @param startColIndex index of the first vector you want in the final ColumnarBatch * @param untilColIndex until index of the columns. (ie doesn't include that column num) * @return a ColumnarBatch of the vectors from the table */ public static ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) { assert table != null : "Table cannot be null"; - assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table + - " to " + Arrays.toString(colTypes); + assert typeConversionAllowed(table, colTypes, startColIndex, untilColIndex) : + "Type conversion is not allowed from " + table + " to " + Arrays.toString(colTypes) + + " columns " + startColIndex + " to " + untilColIndex; int numColumns = untilColIndex - startColIndex; ColumnVector[] columns = new ColumnVector[numColumns]; int finalLoc = 0; boolean success = false; try { for (int i = startColIndex; i < untilColIndex; i++) { - columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes[i]); + columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes[i - startColIndex]); finalLoc++; } long rows = table.getRowCount(); @@ -625,6 +688,44 @@ public static ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) { return vectors; } + /** + * Increment the reference count for all columns in the input batch. + */ + public static ColumnarBatch incRefCounts(ColumnarBatch batch) { + for (ai.rapids.cudf.ColumnVector cv: extractBases(batch)) { + cv.incRefCount(); + } + return batch; + } + + /** + * Take the columns from all of the batches passed in and put them in a single batch. The order + * of the columns is preserved. + *
+ * For example if we had
combineColumns({A, B}, {C, D})
The result would be a single + * batch with
{A, B, C, D}
+ */ + public static ColumnarBatch combineColumns(ColumnarBatch ... batches) { + boolean isFirst = true; + int numRows = 0; + ArrayList columns = new ArrayList<>(); + for (ColumnarBatch cb: batches) { + if (isFirst) { + numRows = cb.numRows(); + isFirst = false; + } else { + assert cb.numRows() == numRows : "Rows do not match expected " + numRows + " found " + + cb.numRows(); + } + int numColumns = cb.numCols(); + for (int i = 0; i < numColumns; i++) { + columns.add(cb.column(i)); + } + } + ColumnarBatch ret = new ColumnarBatch(columns.toArray(new ColumnVector[columns.size()]), numRows); + return incRefCounts(ret); + } + /** * Get the underlying Spark compatible columns from the batch. This does not increment any * reference counts so if you want to use these columns after the batch is closed @@ -706,27 +807,43 @@ public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) { WithTableBuffer wtb = (WithTableBuffer) batch.column(0); sum += wtb.getTableBuffer().getLength(); } else { + HashSet found = new HashSet<>(); for (int i = 0; i < batch.numCols(); i++) { - sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize(); + ai.rapids.cudf.ColumnVector cv = ((GpuColumnVector)batch.column(i)).getBase(); + long id = cv.getNativeView(); + if (found.add(id)) { + sum += cv.getDeviceMemorySize(); + } } } } return sum; } - public static long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) { + public static long getTotalDeviceMemoryUsed(GpuColumnVector[] vectors) { long sum = 0; - for (int i = 0; i < cv.length; i++){ - sum += cv[i].getBase().getDeviceMemorySize(); + HashSet found = new HashSet<>(); + for (int i = 0; i < vectors.length; i++) { + ai.rapids.cudf.ColumnVector cv = vectors[i].getBase(); + long id = cv.getNativeView(); + if (found.add(id)) { + sum += cv.getDeviceMemorySize(); + } } return sum; } - public static long getTotalDeviceMemoryUsed(Table tb) { + public static long getTotalDeviceMemoryUsed(Table table) { long sum = 0; - int len = tb.getNumberOfColumns(); + int len = table.getNumberOfColumns(); + // Deduplicate columns that are the same + HashSet found = new HashSet<>(); for (int i = 0; i < len; i++) { - sum += tb.getColumn(i).getDeviceMemorySize(); + ai.rapids.cudf.ColumnVector cv = table.getColumn(i); + long id = cv.getNativeView(); + if (found.add(id)) { + sum += cv.getDeviceMemorySize(); + } } return sum; } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala index d679dabe4ed..27112d7c04a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,15 @@ trait Arm { } } + /** Executes the provided code block and then closes the array buffer of resources */ + def withResource[T <: AutoCloseable, V](r: ArrayBuffer[T])(block: ArrayBuffer[T] => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + /** Executes the provided code block and then closes the value if it is AutoCloseable */ def withResourceIfAllowed[T, V](r: T)(block: T => V): V = { try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 9b333e2f649..a9da4f7d2ea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -16,7 +16,9 @@ package com.nvidia.spark.rapids -import org.apache.spark.SparkContext +import com.nvidia.spark.rapids.StorageTier.{DEVICE, DISK, GDS, HOST, StorageTier} + +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.SparkPlan @@ -35,7 +37,7 @@ object MetricsLevel { } } -object GpuMetric { +object GpuMetric extends Logging { // Metric names. val BUFFER_TIME = "bufferTime" val GPU_DECODE_TIME = "gpuDecodeTime" @@ -57,6 +59,9 @@ object GpuMetric { val BUILD_DATA_SIZE = "buildDataSize" val BUILD_TIME = "buildTime" val STREAM_TIME = "streamTime" + val SPILL_AMOUNT = "spillData" + val SPILL_AMOUNT_DISK = "spillDisk" + val SPILL_AMOUNT_HOST = "spillHost" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -79,6 +84,9 @@ object GpuMetric { val DESCRIPTION_BUILD_DATA_SIZE = "build side size" val DESCRIPTION_BUILD_TIME = "build time" val DESCRIPTION_STREAM_TIME = "stream time" + val DESCRIPTION_SPILL_AMOUNT = "bytes spilled from GPU" + val DESCRIPTION_SPILL_AMOUNT_DISK = "bytes spilled to disk" + val DESCRIPTION_SPILL_AMOUNT_HOST = "bytes spilled to host" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric @@ -103,6 +111,28 @@ object GpuMetric { object DEBUG_LEVEL extends MetricsLevel(0) object MODERATE_LEVEL extends MetricsLevel(1) object ESSENTIAL_LEVEL extends MetricsLevel(2) + + def makeSpillCallback(allMetrics: Map[String, GpuMetric]): RapidsBuffer.SpillCallback = { + val spillAmount = allMetrics(SPILL_AMOUNT) + val disk = allMetrics(SPILL_AMOUNT_DISK) + val host = allMetrics(SPILL_AMOUNT_HOST) + def updateMetrics(from: StorageTier, to: StorageTier, amount: Long): Unit = { + from match { + case DEVICE => + spillAmount += amount + case _ => // ignored + } + to match { + case HOST => + host += amount + case GDS | DISK => + disk += amount + case _ => + logWarning(s"Spill to $to is unsupported in metrics: $amount") + } + } + updateMetrics + } } sealed abstract class GpuMetric extends Serializable { @@ -199,6 +229,12 @@ trait GpuExec extends SparkPlan with Arm { lazy val additionalMetrics: Map[String, GpuMetric] = Map.empty + protected def spillMetrics: Map[String, GpuMetric] = Map( + SPILL_AMOUNT -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_SPILL_AMOUNT), + SPILL_AMOUNT_DISK -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_SPILL_AMOUNT_DISK), + SPILL_AMOUNT_HOST -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_SPILL_AMOUNT_HOST) + ) + /** * Returns true if there is something in the exec that cannot work when batches between * multiple file partitions are combined into a single batch (coalesce). diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index cbbd912748e..68b3a8db906 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.UTF8String object GpuExpressionsUtils extends Arm { - def evaluateBoundExpressions[A <: GpuExpression](cb: ColumnarBatch, - boundExprs: Seq[A]): Seq[GpuColumnVector] = { - withResource(GpuProjectExec.project(cb, boundExprs)) { cb => - (0 until cb.numCols()).map(cb.column(_).asInstanceOf[GpuColumnVector].incRefCount()) - .toArray.toSeq - } - } - def getTrimString(trimStr: Option[Expression]): String = trimStr match { case Some(GpuLiteral(data, StringType)) => if (data == null) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index 439993d7b21..0b457d83db1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector, NvtxColor, Table} -import com.nvidia.spark.rapids.GpuMetric.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} +import com.nvidia.spark.rapids.GpuMetric.{ESSENTIAL_LEVEL, MODERATE_LEVEL, NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext @@ -112,6 +112,9 @@ case class GpuGenerateExec( override def outputPartitioning: Partitioning = child.outputPartitioning + override protected val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL + override protected val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL + override def doExecute(): RDD[InternalRow] = throw new IllegalStateException(s"Row-based execution should not occur for $this") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioning.scala index 02c292a9788..5cad9ad01f1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioning.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -112,7 +112,7 @@ case class GpuRangePartitioning( try { //get Inputs table bound - inputCvs = SortUtils.getGpuColVectorsAndBindReferences(batch, gpuOrdering) + inputCvs = SortUtils.evaluateForSort(batch, gpuOrdering) inputTbl = new Table(inputCvs.map(_.getBase): _*) //sort incoming batch to compare with ranges sortedTbl = inputTbl.orderBy(orderByArgs: _*) @@ -120,9 +120,8 @@ case class GpuRangePartitioning( //get the table for upper bound calculation slicedSortedTbl = new Table(sortColumns: _*) //get the final column batch, remove the sort order sortColumns - val outputTypes = gpuOrdering.map(_.child.dataType) ++ - GpuColumnVector.extractTypes(batch) - finalSortedCb = GpuColumnVector.from(sortedTbl, outputTypes.toArray, + val outputTypes = GpuColumnVector.extractTypes(batch) + finalSortedCb = GpuColumnVector.from(sortedTbl, outputTypes, numSortCols, sortedTbl.getNumberOfColumns) val numRows = finalSortedCb.numRows partitionColumns = GpuColumnVector.extractColumns(finalSortedCb) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index 4cffa3c3210..3bb9442547f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -16,40 +16,42 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf -import ai.rapids.cudf.{NvtxColor, NvtxRange, Table} +import java.util.{Comparator, LinkedList, PriorityQueue} + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, NvtxRange, Table} +import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.GpuMetric._ +import com.nvidia.spark.rapids.StorageTier.StorageTier import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.execution.{SortExec, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.vectorized.ColumnarBatch +sealed trait SortExecType extends Serializable + +object OutOfCoreSort extends SortExecType +object FullSortSingleBatch extends SortExecType +object SortEachBatch extends SortExecType + class GpuSortMeta( sort: SortExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], rule: DataFromReplacementRule) extends SparkPlanMeta[SortExec](sort, conf, parent, rule) { - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { GpuSortExec(childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]], sort.global, - childPlans(0).convertIfNeeded()) - - override def tagPlanForGpu(): Unit = { - if (GpuOverrides.isAnyStringLit(sort.sortOrder)) { - willNotWorkOnGpu("string literal values are not supported in a sort") - } - val sortOrderDataTypes = sort.sortOrder.map(_.dataType) - if (sortOrderDataTypes.exists(dtype => - dtype.isInstanceOf[ArrayType] || dtype.isInstanceOf[StructType] - || dtype.isInstanceOf[MapType])) { - willNotWorkOnGpu("Nested types in Sort Order are not supported") - } + childPlans.head.convertIfNeeded(), + if (conf.stableSort) FullSortSingleBatch else OutOfCoreSort) } } @@ -57,213 +59,459 @@ case class GpuSortExec( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan, - coalesceGoal: CoalesceGoal = RequireSingleBatch, - testSpillFrequency: Int = 0) + sortType: SortExecType) extends UnaryExecNode with GpuExec { - private val sparkSortOrder = sortOrder - - override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(coalesceGoal) + override def childrenCoalesceGoal: Seq[CoalesceGoal] = sortType match { + case FullSortSingleBatch => Seq(RequireSingleBatch) + case OutOfCoreSort | SortEachBatch => Seq(null) + case t => throw new IllegalArgumentException(s"Unexpected Sort Type $t") + } override def output: Seq[Attribute] = child.output - override def outputOrdering: Seq[SortOrder] = sparkSortOrder + override def outputOrdering: Seq[SortOrder] = sortOrder // sort performed is local within a given partition so will retain // child operator's partitioning override def outputPartitioning: Partitioning = child.outputPartitioning override def requiredChildDistribution: Seq[Distribution] = - if (global) OrderedDistribution(sparkSortOrder) :: Nil else UnspecifiedDistribution :: Nil + if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - // Eventually this might change, but for now we will produce a single batch, which is the same - // as what we require from our input. - override def outputBatching: CoalesceGoal = RequireSingleBatch + override def outputBatching: CoalesceGoal = sortType match { + // We produce a single batch if we know that our input will be a single batch + case FullSortSingleBatch => RequireSingleBatch + case _ => null + } override def doExecute(): RDD[InternalRow] = throw new IllegalStateException(s"Row-based execution should not occur for $this") - override lazy val additionalMetrics = Map( - SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME), - PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)) + override lazy val additionalMetrics: Map[String, GpuMetric] = { + val required = Map( + SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME), + PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)) + if (sortType == OutOfCoreSort) { + required ++ spillMetrics + } else { + required + } + } + + private [this] lazy val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val sorter = new GpuSorter(sortOrder, output) + val sortTime = gpuLongMetric(SORT_TIME) val peakDevMemory = gpuLongMetric(PEAK_DEVICE_MEMORY) - - val crdd = child.executeColumnar() - crdd.mapPartitions { cbIter => - val sorter = createBatchGpuSorter() - val sortedIterator = sorter.sort(cbIter) - sortTime += sorter.getSortTimeNanos - peakDevMemory += sorter.getPeakMemoryUsage - sortedIterator + val totalTime = gpuLongMetric(TOTAL_TIME) + val outputBatch = gpuLongMetric(NUM_OUTPUT_BATCHES) + val outputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val outOfCore = sortType == OutOfCoreSort + child.executeColumnar().mapPartitions { cbIter => + if (outOfCore) { + val cpuOrd = new LazilyGeneratedOrdering(sorter.cpuOrdering) + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) + val iter = GpuOutOfCoreSortIterator(cbIter, sorter, cpuOrd, + // To avoid divide by zero errors, underflow and overflow issues in tests + // that want the targetSize to be 0, we set it to something more reasonable + math.max(16 * 1024, targetSize), totalTime, sortTime, outputBatch, outputRows, + peakDevMemory, spillCallback) + TaskContext.get().addTaskCompletionListener(_ -> iter.close()) + iter + } else { + GpuSortEachBatchIterator(cbIter, sorter, totalTime, sortTime, outputBatch, outputRows, + peakDevMemory) + } } } +} - private def createBatchGpuSorter(): GpuColumnarBatchSorter = { - val boundSortExprs = GpuBindReferences.bindReferences(sortOrder, output) - new GpuColumnarBatchSorter(boundSortExprs, this, coalesceGoal == RequireSingleBatch) +case class GpuSortEachBatchIterator( + iter: Iterator[ColumnarBatch], + sorter: GpuSorter, + totalTime: GpuMetric = NoopMetric, + sortTime: GpuMetric = NoopMetric, + outputBatches: GpuMetric = NoopMetric, + outputRows: GpuMetric = NoopMetric, + peakDevMemory: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Arm { + override def hasNext: Boolean = iter.hasNext + + override def next(): ColumnarBatch = { + withResource(iter.next()) { cb => + withResource(new NvtxWithMetrics("sort total", NvtxColor.WHITE, totalTime)) { _ => + val ret = sorter.fullySortBatch(cb, sortTime, peakDevMemory) + outputBatches += 1 + outputRows += ret.numRows() + ret + } + } } } -class GpuColumnarBatchSorter( - sortOrder: Seq[SortOrder], - exec: GpuExec, - singleBatchOnly: Boolean, - shouldUpdateMetrics: Boolean = true) extends Serializable { - - private var totalSortTimeNanos = 0L - private var maxDeviceMemory = 0L - private var haveSortedBatch = false - private val numSortCols = sortOrder.length - private val totalTimeMetric : Option[GpuMetric] = initMetric(TOTAL_TIME) - private val outputBatchesMetric : Option[GpuMetric] = initMetric(NUM_OUTPUT_BATCHES) - private val outputRowsMetric : Option[GpuMetric] = initMetric(NUM_OUTPUT_ROWS) - - private def initMetric(metricName: String): Option[GpuMetric] = if (shouldUpdateMetrics) { - Some(exec.gpuLongMetric(metricName)) - } else { - None +/** + * Holds data for the out of core sort. It includes the batch of data and the first row in that + * batch so we can sort the batches. + */ +case class OutOfCoreBatch(buffer: SpillableColumnarBatch, + firstRow: UnsafeRow) extends AutoCloseable { + override def close(): Unit = buffer.close() +} + +/** + * Data that the out of core sort algorithm has not finished sorting. This acts as a priority + * queue with each batch sorted by the first row in that batch. + */ +class Pending(cpuOrd: LazilyGeneratedOrdering) extends AutoCloseable { + private val pending = new PriorityQueue[OutOfCoreBatch](new Comparator[OutOfCoreBatch]() { + override def compare(a: OutOfCoreBatch, b: OutOfCoreBatch): Int = + cpuOrd.compare(a.firstRow, b.firstRow) + }) + private var pendingSize = 0L + def add(buffer: SpillableColumnarBatch, row: UnsafeRow): Unit = { + pending.add(OutOfCoreBatch(buffer, row)) + pendingSize += buffer.sizeInBytes } - def getSortTimeNanos: Long = totalSortTimeNanos + def storedSize: Long = pendingSize - def getPeakMemoryUsage: Long = maxDeviceMemory + def size(): Int = pending.size() - def sort(batchIter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { + def poll(): OutOfCoreBatch = { + val ret = pending.poll() + if (ret != null) { + pendingSize -= ret.buffer.sizeInBytes + } + ret + } - // Sort order shouldn't be empty for Sort exec, in any other case empty sort order - // translates to an ascending sort on all columns with nulls as smallest - new Iterator[ColumnarBatch] { - var resultBatch: Option[ColumnarBatch] = None + def peek(): OutOfCoreBatch = pending.peek() - TaskContext.get().addTaskCompletionListener[Unit](_ => closeBatch()) + def isEmpty: Boolean = pending.isEmpty - private def closeBatch(): Unit = resultBatch.foreach(_.close()) + override def close(): Unit = pending.forEach(_.close()) +} - private def loadNextBatch(): Option[ColumnarBatch] = { - if (batchIter.hasNext) { - if (singleBatchOnly && haveSortedBatch) { - throw new UnsupportedOperationException("Expected single batch to sort") - } - haveSortedBatch = true - val inputBatch = batchIter.next() - try { - if (inputBatch.numCols() > 0) { - Some(sortBatch(inputBatch)) - } else { - Some(new ColumnarBatch(Array.empty, inputBatch.numRows())) - } - } finally { - inputBatch.close() - } - } else { - None +/** + * Sorts incoming batches of data spilling if needed. + *
+ * The algorithm for this is a modified version of an external merge sort with multiple passes for + * large data. + * https://en.wikipedia.org/wiki/External_sorting#External_merge_sort + *
+ * The main difference is that we cannot stream the data when doing a merge sort. So, we instead + * divide the data into batches that are small enough that we can do a merge sort on N batches + * and still fit the output within the target batch size. When merging batches instead of + * individual rows we cannot assume that all of the resulting data is globally sorted. Hopefully, + * most of it is globally sorted but we have to use the first row from the next pending batch to + * determine the cutoff point between globally sorted data and data that still needs to be merged + * with other batches. The globally sorted portion is put into a sorted queue while the rest of + * the merged data is split and put back into a pending queue. The process repeats until we have + * enough data to output. + */ +case class GpuOutOfCoreSortIterator( + iter: Iterator[ColumnarBatch], + sorter: GpuSorter, + cpuOrd: LazilyGeneratedOrdering, + targetSize: Long, + totalTime: GpuMetric, + sortTime: GpuMetric, + outputBatches: GpuMetric, + outputRows: GpuMetric, + peakDevMemory: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback) extends Iterator[ColumnarBatch] + with Arm with AutoCloseable { + + // There are so many places where we might hit a new peak that it gets kind of complex + // so we are picking a few places that are likely to increase the amount of memory used. + // and we update this temporary value. Only when we output a batch do we put this into the + // peakDevMemory metric + private var peakMemory = 0L + + // A priority queue of data that is not merged yet. + private val pending = new Pending(cpuOrd) + + // data that has been determined to be globally sorted and is waiting to be output. + private val sorted = new LinkedList[SpillableColumnarBatch]() + // how much data, in bytes, that is stored in `sorted` + private var sortedSize = 0L + + override def hasNext: Boolean = !sorted.isEmpty || !pending.isEmpty || iter.hasNext + + // Use types for the UnsafeProjection otherwise we need to have CPU BoundAttributeReferences + // used for converting between columnar data and rows (to get the first row in each batch). + private lazy val unsafeProjection = UnsafeProjection.create(sorter.projectedBatchTypes) + // Used for converting between rows and columns when we have to put a cuttoff on the GPU + // to know how much of the data after a merge sort is fully sorted. + private lazy val converters = new GpuRowToColumnConverter( + TrampolineUtil.fromAttributes(sorter.projectedBatchSchema)) + + /** + * Convert the boundaries (first rows for each batch) into unsafe rows for use later on. + */ + private def convertBoundaries(tab: Table): Array[UnsafeRow] = { + import scala.collection.JavaConverters._ + val cb = withResource(new NvtxRange("COPY BOUNDARIES", NvtxColor.PURPLE)) { _ => + new ColumnarBatch( + GpuColumnVector.extractColumns(tab, sorter.projectedBatchTypes).map(_.copyToHost()), + tab.getRowCount.toInt) + } + withResource(cb) { cb => + withResource(new NvtxRange("TO UNSAFE ROW", NvtxColor.RED)) { _ => + cb.rowIterator().asScala.map(unsafeProjection).map(_.copy().asInstanceOf[UnsafeRow]).toArray + } + } + } + + /** + * A rather complex function. It will take a sorted table (either the output of a regular sort or + * a merge sort), split it up, and place the split portions into the proper queues. If + * sortedOffset >= 0 then everything below that offset is considered to be fully sorted and is + * placed in the sorted queue. Everything else is spilt into smaller batches as determined by + * this function and placed in the pending priority queue to be merge sorted. + */ + private final def splitAfterSortAndSave(sortedTbl: Table, sortedOffset: Int = -1): Unit = { + var memUsed: Long = GpuColumnVector.getTotalDeviceMemoryUsed(sortedTbl) + // We need to figure out how to split up the data into reasonable batches. We could try and do + // something really complicated and figure out how much data get per batch, but in practice + // we really only expect to see one or two batches worth of data come in, so lets optimize + // for that case and set the targetBatchSize to always be 1/8th the targetSize. + val targetBatchSize = targetSize / 8 + val rows = sortedTbl.getRowCount.toInt + val memSize = GpuColumnVector.getTotalDeviceMemoryUsed(sortedTbl) + val averageRowSize = memSize.toDouble/rows + // Protect ourselves from large rows when there are small targetSizes + val targetRowCount = Math.max((targetBatchSize/averageRowSize).toInt, 1024) + + if (sortedOffset == rows - 1) { + // The entire thing is sorted + withResource(sortedTbl.contiguousSplit()) { splits => + assert(splits.length == 1) + memUsed += splits.head.getBuffer.getLength + closeOnExcept( + GpuColumnVectorFromBuffer.from(splits.head, sorter.projectedBatchTypes)) { cb => + val sp = SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback) + sortedSize += sp.sizeInBytes + sorted.add(sp) } } + } else { + val splitIndexes = if (sortedOffset >= 0) { + sortedOffset until rows by targetRowCount + } else { + targetRowCount until rows by targetRowCount + } + // Get back the first row so we can sort the batches + val gatherIndexes = if (sortedOffset >= 0) { + // The first batch is sorted so don't gather a row for it + splitIndexes + } else { + Seq(0) ++ splitIndexes + } - private def sortBatch(inputBatch: ColumnarBatch): ColumnarBatch = { - val nvtxRange = initNvtxRange - try { - var outputTypes: Seq[DataType] = Nil - var inputTbl: Table = null - var inputCvs: Seq[GpuColumnVector] = Nil - try { - if (sortOrder.nonEmpty) { - inputCvs = SortUtils.getGpuColVectorsAndBindReferences(inputBatch, sortOrder) - inputTbl = new cudf.Table(inputCvs.map(_.getBase): _*) - outputTypes = sortOrder.map(_.child.dataType) ++ - GpuColumnVector.extractTypes(inputBatch) - } else if (inputBatch.numCols() > 0) { - inputTbl = GpuColumnVector.from(inputBatch) - outputTypes = GpuColumnVector.extractTypes(inputBatch) - } - val orderByArgs = getOrderArgs(inputTbl) - val startTimestamp = System.nanoTime() - val batch = doGpuSort(inputTbl, orderByArgs, outputTypes) - updateMetricValues(inputTbl, startTimestamp, batch) - batch - } finally { - inputCvs.foreach(_.close()) - if (inputTbl != null) { - inputTbl.close() + val boundaries = + withResource(new NvtxRange("boundaries", NvtxColor.ORANGE)) { _ => + withResource(ColumnVector.fromInts(gatherIndexes: _*)) { gatherMap => + withResource(sortedTbl.gather(gatherMap)) { boundariesTab => + // Hopefully this is minor but just in case... + memUsed += gatherMap.getDeviceMemorySize + + GpuColumnVector.getTotalDeviceMemoryUsed(boundariesTab) + convertBoundaries(boundariesTab) } } - } finally { - nvtxRange.close() } - } - override def hasNext: Boolean = { - if (resultBatch.isDefined) { - true + withResource(sortedTbl.contiguousSplit(splitIndexes: _*)) { splits => + memUsed += splits.map(_.getBuffer.getLength).sum + val stillPending = if (sortedOffset >= 0) { + closeOnExcept( + GpuColumnVectorFromBuffer.from(splits.head, sorter.projectedBatchTypes)) { cb => + val sp = SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback) + sortedSize += sp.sizeInBytes + sorted.add(sp) + } + splits.slice(1, splits.length) } else { - resultBatch = loadNextBatch() - resultBatch.isDefined + splits } - } - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException + assert(boundaries.length == stillPending.length) + stillPending.zip(boundaries).foreach { + case (ct: ContiguousTable, lower: UnsafeRow) => + if (ct.getRowCount > 0) { + closeOnExcept( + GpuColumnVectorFromBuffer.from(ct, sorter.projectedBatchTypes)) { cb => + pending.add(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY, + spillCallback), lower) + } + } else { + ct.close() + } } - val ret = resultBatch.get - resultBatch = None - ret } } + peakMemory = Math.max(peakMemory, memUsed) } - private def getOrderArgs(inputTbl: Table): Seq[Table.OrderByArg] = { - val orderByArgs = if (sortOrder.nonEmpty) { - sortOrder.zipWithIndex.map { case (order, index) => - if (order.isAscending) { - Table.asc(index, order.nullOrdering == NullsFirst) - } else { - Table.desc(index, order.nullOrdering == NullsLast) + /** + * First pass through the data. Read in all of the batches, sort each batch and split them up into + * smaller chunks for later merge sorting. + */ + private final def firstPassReadBatches(): Unit = { + while(iter.hasNext) { + var memUsed = 0L + val sortedTbl = withResource(iter.next()) { batch => + memUsed += GpuColumnVector.getTotalDeviceMemoryUsed(batch) + withResource(new NvtxWithMetrics("initial sort", NvtxColor.CYAN, totalTime)) { _ => + sorter.appendProjectedAndSort(batch, sortTime) } } - } else { - (0 until inputTbl.getNumberOfColumns).map { index => - Table.asc(index, true) + memUsed += GpuColumnVector.getTotalDeviceMemoryUsed(sortedTbl) + peakMemory = Math.max(peakMemory, memUsed) + withResource(new NvtxWithMetrics("split input batch", NvtxColor.CYAN, totalTime)) { _ => + withResource(sortedTbl) { sortedTbl => + val rows = sortedTbl.getRowCount.toInt + // filter out empty batches + if (rows > 0) { + splitAfterSortAndSave(sortedTbl) + } + } } } - orderByArgs } - private def updateMetricValues(inputTbl: Table, startTimestamp: Long, - batch: ColumnarBatch): Unit = { - if (shouldUpdateMetrics) { - totalSortTimeNanos += System.nanoTime - startTimestamp - outputBatchesMetric.get += 1 - outputRowsMetric.get += batch.numRows - val devMemUsed = GpuColumnVector.getTotalDeviceMemoryUsed(inputTbl) + - GpuColumnVector.getTotalDeviceMemoryUsed(batch) - maxDeviceMemory = scala.math.max(maxDeviceMemory, devMemUsed) + /** + * Merge sort enough data that we can output a batch and put it in the sorted queue. + * @return if we can determine that we can return a final batch without putting it in the sorted + * queue then optionally return it. + */ + private final def mergeSortEnoughToOutput(): Option[ColumnarBatch] = { + var memUsed = 0L + // Now get enough sorted data to return + while (!pending.isEmpty && sortedSize < targetSize) { + // Keep going until we have enough data to return + var bytesLeftToFetch = targetSize + val mergedBatch = withResource(ArrayBuffer[SpillableColumnarBatch]()) { pendingSort => + while (!pending.isEmpty && + (bytesLeftToFetch - pending.peek().buffer.sizeInBytes >= 0 || pendingSort.isEmpty)) { + val buffer = pending.poll().buffer + pendingSort += buffer + bytesLeftToFetch -= buffer.sizeInBytes + } + withResource(ArrayBuffer[ColumnarBatch]()) { batches => + pendingSort.foreach { tmp => + val batch = tmp.getColumnarBatch() + memUsed += GpuColumnVector.getTotalDeviceMemoryUsed(batch) + batches += batch + } + if (batches.size == 1) { + // Single batch no need for a merge sort + GpuColumnVector.incRefCounts(batches.head) + } else { + val ret = sorter.mergeSort(batches.toArray, sortTime) + memUsed += GpuColumnVector.getTotalDeviceMemoryUsed(ret) + ret + } + } + } + // We now have closed the input tables to merge sort so reset the memory used + // we will ignore the upper bound and the single column because it should not have much + // of an impact, but if we wanted to be exact we would look at those too. + peakMemory = Math.max(peakMemory, memUsed) + withResource(mergedBatch) { mergedBatch => + // First we want figure out what is fully sorted from what is not + val sortSplitOffset = if (pending.isEmpty) { + // No need to split it + mergedBatch.numRows() - 1 + } else { + // The data is only fully sorted if there is nothing pending that is smaller than it + // so get the next "smallest" row that is pending. + val cutoff = pending.peek().firstRow + val builders = new GpuColumnarBatchBuilder( + TrampolineUtil.fromAttributes(sorter.projectedBatchSchema), 1, null) + converters.convert(cutoff, builders) + withResource(builders.build(1)) { cutoffCb => + withResource(sorter.upperBound(mergedBatch, cutoffCb)) { result => + withResource(result.copyToHost()) { hostResult => + assert(hostResult.getRowCount == 1) + hostResult.getInt(0) + } + } + } + } + if (sortSplitOffset == mergedBatch.numRows() - 1 && sorted.isEmpty && + (GpuColumnVector.getTotalDeviceMemoryUsed(mergedBatch) >= targetSize || + pending.isEmpty)) { + // This is a special case where we have everything we need to output already so why + // bother with another contig split just to put it into the queue + return Some(GpuColumnVector.incRefCounts(mergedBatch)) + } + withResource(GpuColumnVector.from(mergedBatch)) { mergedTbl => + splitAfterSortAndSave(mergedTbl, sortSplitOffset) + } + } } + None } - private def initNvtxRange = { - if (shouldUpdateMetrics) { - new NvtxWithMetrics("sort batch", NvtxColor.WHITE, totalTimeMetric.get) - } else { - new NvtxRange("sort batch", NvtxColor.WHITE) + /** + * Take data from the sorted queue and return a final batch that can be returned + * @return a batch that can be returned. + */ + private final def concatOutput(): ColumnarBatch = { + // combine all the sorted data into a single batch + withResource(ArrayBuffer[Table]()) { tables => + var totalBytes = 0L + while(!sorted.isEmpty && (tables.isEmpty || + (totalBytes + sorted.peek().sizeInBytes) < targetSize)) { + withResource(sorted.pop()) { tmp => + sortedSize -= tmp.sizeInBytes + totalBytes += tmp.sizeInBytes + withResource(tmp.getColumnarBatch()) { batch => + tables += GpuColumnVector.from(batch) + } + } + } + var memUsed = totalBytes + val ret = if (tables.length == 1) { + // We cannot concat a single table + sorter.removeProjectedColumns(tables.head) + } else { + withResource(Table.concatenate(tables: _*)) { combined => + // ignore the output of removing the columns because it is just dropping columns + // so it will be smaller than this with not added memory + memUsed += GpuColumnVector.getTotalDeviceMemoryUsed(combined) + sorter.removeProjectedColumns(combined) + } + } + peakMemory = Math.max(peakMemory, memUsed) + ret } } - private def doGpuSort( - tbl: Table, - orderByArgs: Seq[Table.OrderByArg], - types: Seq[DataType]): ColumnarBatch = { - var resultTbl: cudf.Table = null - try { - resultTbl = tbl.orderBy(orderByArgs: _*) - GpuColumnVector.from(resultTbl, types.toArray, numSortCols, resultTbl.getNumberOfColumns) - } finally { - if (resultTbl != null) { - resultTbl.close() - } + override def next(): ColumnarBatch = { + if (sorter.projectedBatchSchema.isEmpty) { + // special case, no columns just rows + iter.next() } + if (pending.isEmpty && sorted.isEmpty) { + firstPassReadBatches() + } + withResource(new NvtxWithMetrics("Sort next output batch", NvtxColor.CYAN, totalTime)) { _ => + val ret = mergeSortEnoughToOutput().getOrElse(concatOutput()) + outputBatches += 1 + outputRows += ret.numRows() + peakDevMemory.set(Math.max(peakMemory, peakDevMemory.value)) + ret + } + } + + override def close(): Unit = { + sorted.forEach(_.close()) + pending.close() } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 70cf9f39ecc..d00ddb23e87 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -337,10 +337,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { plan match { case _: GpuHashJoin => val sortOrder = getOptimizedSortOrder(plan) - GpuSortExec(sortOrder, false, plan, TargetSize(conf.gpuTargetBatchSizeBytes)) + GpuSortExec(sortOrder, false, plan, SortEachBatch) case _: GpuHashAggregateExec => val sortOrder = getOptimizedSortOrder(plan) - GpuSortExec(sortOrder, false, plan, TargetSize(conf.gpuTargetBatchSizeBytes)) + GpuSortExec(sortOrder, false, plan, SortEachBatch) case p => if (p.outputOrdering.isEmpty) { plan.withNewChildren(plan.children.map(insertHashOptimizeSorts)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 7b418d2a804..d52d25d5434 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -58,6 +58,26 @@ object StorageTier extends Enumeration { val GDS: StorageTier = Value(3, "GPUDirect Storage") } +object RapidsBuffer { + /** + * Callback type for when a batch is spilled from one storage tier to another. This is + * intended to only be used for metrics gathering in parts of the GPU plan that can spill. + * No GPU memory should ever be allocated from this callback, blocking in this function + * is strongly discouraged. It should be as light weight as possible. It takes three arguments + *
    + *
  • from the storage tier the data is being spilled from.
  • + *
  • to the storage tier the data is being spilled to.
  • + *
  • amount the amount of data in bytes that is spilled.
  • + *
+ */ + type SpillCallback = (StorageTier, StorageTier, Long) => Unit + + /** + * A default NOOP callback for when a buffer is spilled + */ + def defaultSpillCallback(to: StorageTier, from: StorageTier, amount: Long): Unit = () +} + /** Interface provided by all types of RAPIDS buffers */ trait RapidsBuffer extends AutoCloseable { /** The buffer identifier for this buffer. */ @@ -72,6 +92,8 @@ trait RapidsBuffer extends AutoCloseable { /** The storage tier for this buffer */ val storageTier: StorageTier + val spillCallback: RapidsBuffer.SpillCallback + /** * Get the columnar batch within this buffer. The caller must have * successfully acquired the buffer beforehand. @@ -169,4 +191,6 @@ sealed class DegenerateRapidsBuffer( override def setSpillPriority(priority: Long): Unit = {} override def close(): Unit = {} + + override val spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index deb832eca4f..ad9eaee836e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -187,26 +187,32 @@ object RapidsBufferCatalog extends Logging with Arm { * @param contigBuffer device memory buffer backing the table * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addTable( id: RapidsBufferId, table: Table, contigBuffer: DeviceMemoryBuffer, tableMeta: TableMeta, - initialSpillPriority: Long): Unit = - deviceStorage.addTable(id, table, contigBuffer, tableMeta, initialSpillPriority) + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = + deviceStorage.addTable(id, table, contigBuffer, tableMeta, initialSpillPriority, spillCallback) /** * Adds a contiguous table to the device storage, taking ownership of the table. * @param id buffer ID to associate with this buffer - * @param contigTable contiguos table to track in device storage + * @param contigTable contiguous table to track in device storage * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addContiguousTable( id: RapidsBufferId, contigTable: ContiguousTable, - initialSpillPriority: Long): Unit = - deviceStorage.addContiguousTable(id, contigTable, initialSpillPriority) + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = + deviceStorage.addContiguousTable(id, contigTable, initialSpillPriority, spillCallback) /** * Adds a buffer to the device storage, taking ownership of the buffer. @@ -214,13 +220,16 @@ object RapidsBufferCatalog extends Logging with Arm { * @param buffer buffer that will be owned by the store * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addBuffer( id: RapidsBufferId, buffer: DeviceMemoryBuffer, tableMeta: TableMeta, - initialSpillPriority: Long): Unit = - deviceStorage.addBuffer(id, buffer, tableMeta, initialSpillPriority) + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = + deviceStorage.addBuffer(id, buffer, tableMeta, initialSpillPriority, spillCallback) /** * Lookup the buffer that corresponds to the specified buffer ID and acquire it. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 127fb80ed0b..1e618d611de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} +import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.internal.Logging @@ -38,10 +39,12 @@ object RapidsBufferStore { * @param catalog catalog to register this store */ abstract class RapidsBufferStore( - val name: String, + val tier: StorageTier, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends AutoCloseable with Logging { + val name: String = tier.toString + private class BufferTracker { private[this] val comparator: Comparator[RapidsBufferBase] = (o1: RapidsBufferBase, o2: RapidsBufferBase) => @@ -230,6 +233,7 @@ abstract class RapidsBufferStore( val newBuffer = try { logDebug(s"Spilling $buffer ${buffer.id} to ${spillStore.name} " + s"total mem=${buffers.getTotalBytes}") + buffer.spillCallback(buffer.storageTier, spillStore.tier, buffer.size) spillStore.copyBuffer(buffer, stream) } finally { buffer.close() @@ -246,7 +250,8 @@ abstract class RapidsBufferStore( override val id: RapidsBufferId, override val size: Long, override val meta: TableMeta, - initialSpillPriority: Long) extends RapidsBuffer with Arm { + initialSpillPriority: Long, + override val spillCallback: RapidsBuffer.SpillCallback) extends RapidsBuffer with Arm { private[this] var isValid = true protected[this] var refcount = 0 private[this] var spillPriority: Long = initialSpillPriority diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 8c5743ae40c..c9c52cf0b0c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -431,6 +431,17 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled") + .doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " + + "sort, but sort stability cannot be guaranteed in distributed work loads because the " + + "order in which upstream data arrives to a task is not guaranteed. Sort stability then " + + "only matters when reading and sorting data from a file using a single task/partition. " + + "Because of limitations in the plugin when you enable stable sorting all of the data " + + "for a single task will be combined into a single batch before sorting. This currently " + + "disables spilling from GPU memory if the data size is too large.") + .booleanConf + .createWithDefault(false) + // METRICS val METRICS_LEVEL = conf("spark.rapids.sql.metrics.level") @@ -1039,6 +1050,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val exportColumnarRdd: Boolean = get(EXPORT_COLUMNAR_RDD) + lazy val stableSort: Boolean = get(STABLE_SORT) + lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS) lazy val incompatDateFormats: Boolean = get(INCOMPATIBLE_DATE_FORMATS) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 677813c34cd..57f2c15ac6c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * @param catalog catalog to register this store */ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) - extends RapidsBufferStore("GPU", catalog) { + extends RapidsBufferStore(StorageTier.DEVICE, catalog) { override protected def createBuffer( other: RapidsBuffer, stream: Cuda.Stream): RapidsBufferBase = { @@ -42,20 +42,24 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog * @param contigBuffer device memory buffer backing the table * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addTable( id: RapidsBufferId, table: Table, contigBuffer: DeviceMemoryBuffer, tableMeta: TableMeta, - initialSpillPriority: Long): Unit = { + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = { val buffer = new RapidsDeviceMemoryBuffer( id, contigBuffer.getLength, tableMeta, Some(table), contigBuffer, - initialSpillPriority) + initialSpillPriority, + spillCallback) try { logDebug(s"Adding table for: [id=$id, size=${buffer.size}, " + s"meta_id=${buffer.meta.bufferMeta.id}, meta_size=${buffer.meta.bufferMeta.size}]") @@ -75,11 +79,14 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog * @param id buffer ID to associate with this buffer * @param contigTable contiguous table to track in storage * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addContiguousTable( id: RapidsBufferId, contigTable: ContiguousTable, - initialSpillPriority: Long): Unit = { + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = { val contigBuffer = contigTable.getBuffer val size = contigBuffer.getLength val meta = MetaUtils.buildTableMeta(id.tableId, contigTable) @@ -90,7 +97,8 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog meta, None, contigBuffer, - initialSpillPriority) + initialSpillPriority, + spillCallback) try { logDebug(s"Adding table for: [id=$id, size=${buffer.size}, " + s"uncompressed=${buffer.meta.bufferMeta.uncompressedSize}, " + @@ -109,19 +117,23 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog * @param buffer buffer that will be owned by the store * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ def addBuffer( id: RapidsBufferId, buffer: DeviceMemoryBuffer, tableMeta: TableMeta, - initialSpillPriority: Long): Unit = { + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback): Unit = { val buff = new RapidsDeviceMemoryBuffer( id, buffer.getLength, tableMeta, None, buffer, - initialSpillPriority) + initialSpillPriority, + spillCallback) try { logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + s"uncompressed=${buff.meta.bufferMeta.uncompressedSize}, " + @@ -141,7 +153,9 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog meta: TableMeta, table: Option[Table], contigBuffer: DeviceMemoryBuffer, - spillPriority: Long) extends RapidsBufferBase(id, size, meta, spillPriority) { + spillPriority: Long, + override val spillCallback: RapidsBuffer.SpillCallback) + extends RapidsBufferBase(id, size, meta, spillPriority, spillCallback) { override val storageTier: StorageTier = StorageTier.DEVICE override protected def releaseResources(): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 5c613554c6f..399eeee1639 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import org.apache.spark.sql.rapids.RapidsDiskBlockManager class RapidsDiskStore( diskBlockManager: RapidsDiskBlockManager, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) - extends RapidsBufferStore("disk", catalog) { + extends RapidsBufferStore(StorageTier.DISK, catalog) { private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File] override def createBuffer( @@ -58,7 +58,7 @@ class RapidsDiskStore( } logDebug(s"Spilled to $path $fileOffset:${incoming.size}") new this.RapidsDiskBuffer(id, fileOffset, incoming.size, incoming.meta, - incoming.getSpillPriority) + incoming.getSpillPriority, incoming.spillCallback) } finally { incomingBuffer.close() } @@ -91,7 +91,9 @@ class RapidsDiskStore( fileOffset: Long, size: Long, meta: TableMeta, - spillPriority: Long) extends RapidsBufferBase(id, size, meta, spillPriority) { + spillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback) + extends RapidsBufferBase(id, size, meta, spillPriority, spillCallback) { private[this] var hostBuffer: Option[HostMemoryBuffer] = None override val storageTier: StorageTier = StorageTier.DISK diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 9238970753a..2dfeed5511a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch class RapidsGdsStore( diskBlockManager: RapidsDiskBlockManager, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) - extends RapidsBufferStore("gds", catalog) with Arm { + extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm { private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File] override def createBuffer( @@ -59,7 +59,8 @@ class RapidsGdsStore( 0 } logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS") - new RapidsGdsBuffer(id, fileOffset, other.size, other.meta, other.getSpillPriority) + new RapidsGdsBuffer(id, fileOffset, other.size, other.meta, other.getSpillPriority, + other.spillCallback) } } @@ -68,7 +69,9 @@ class RapidsGdsStore( fileOffset: Long, size: Long, meta: TableMeta, - spillPriority: Long) extends RapidsBufferBase(id, size, meta, spillPriority) { + spillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback) + extends RapidsBufferBase(id, size, meta, spillPriority, spillCallback) { override val storageTier: StorageTier = StorageTier.GDS // TODO(rongou): cache this buffer to avoid repeated reads from disk. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index b23729083e0..5022e8f2772 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil class RapidsHostMemoryStore( maxSize: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) - extends RapidsBufferStore("host", catalog) { + extends RapidsBufferStore(StorageTier.HOST, catalog) { private[this] val pool = HostMemoryBuffer.allocate(maxSize, false) private[this] val addressAllocator = new AddressSpaceAllocator(maxSize) private[this] var haveLoggedMaxExceeded = false @@ -101,7 +101,8 @@ class RapidsHostMemoryStore( other.meta, other.getSpillPriority, hostBuffer, - isPinned) + isPinned, + other.spillCallback) } def numBytesFree: Long = maxSize - currentSize @@ -117,7 +118,9 @@ class RapidsHostMemoryStore( meta: TableMeta, spillPriority: Long, buffer: HostMemoryBuffer, - isInternalPoolAllocated: Boolean) extends RapidsBufferBase(id, size, meta, spillPriority) { + isInternalPoolAllocated: Boolean, + spillCallback: RapidsBuffer.SpillCallback) + extends RapidsBufferBase(id, size, meta, spillPriority, spillCallback) { override val storageTier: StorageTier = StorageTier.HOST override def getMemoryBuffer: MemoryBuffer = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala index 84b1f946633..cb6645db2fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,20 +16,33 @@ package com.nvidia.spark.rapids +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.nvidia.spark.rapids.GpuExpressionsUtils.evaluateBoundExpressions +import ai.rapids.cudf.{ColumnVector, NvtxColor, Table} -import org.apache.spark.sql.catalyst.expressions.{NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch -object SortUtils { +object SortUtils extends Arm { + private [this] def evaluateBoundExpressions[A <: GpuExpression](cb: ColumnarBatch, + boundExprs: Seq[A]): Seq[GpuColumnVector] = { + withResource(GpuProjectExec.project(cb, boundExprs)) { cb => + (0 until cb.numCols()).map(cb.column(_).asInstanceOf[GpuColumnVector].incRefCount()) + // because this processing has a side effect (inc ref count) we want to force + // the data to execute now, instead of lazily. To do this we first convert it + // to an array and then back to a sequence again. Seq does not have a force method + .toArray.toSeq + } + } + /* * This function takes the input batch and the bound sort order references and * evaluates each column in case its an expression. It then appends the original columns * after the sort key columns. The sort key columns will be dropped after sorting. */ - def getGpuColVectorsAndBindReferences(batch: ColumnarBatch, + def evaluateForSort(batch: ColumnarBatch, boundInputReferences: Seq[SortOrder]): Seq[GpuColumnVector] = { val sortCvs = new ArrayBuffer[GpuColumnVector](boundInputReferences.length) val childExprs = boundInputReferences.map(_.child.asInstanceOf[GpuExpression]) @@ -46,4 +59,265 @@ object SortUtils { (order.isAscending && order.nullOrdering == NullsFirst) || (!order.isAscending && order.nullOrdering == NullsLast) } + + @scala.annotation.tailrec + def extractReference(exp: Expression): Option[GpuBoundReference] = exp match { + case r: GpuBoundReference => Some(r) + case a: Alias => extractReference(a.child) + case _ => None + } + + def getOrder(order: SortOrder, index: Int): Table.OrderByArg = + if (order.isAscending) { + Table.asc(index, order.nullOrdering == NullsFirst) + } else { + Table.desc(index, order.nullOrdering == NullsLast) + } +} + +/** + * A class that provides convenience methods for sorting batches of data. A Spark SortOrder + * typically will just reference a single column using an AttributeReference. This is the simplest + * situation so we just need to bind the attribute references to where they go, but it is possible + * that some computation can be done in the SortOrder. This would be a situation like sorting + * strings by their length instead of in lexicographical order. Because cudf does not support this + * directly we instead go through the SortOrder instances that are a part of this sorter and find + * the ones that require computation. We then do the sort in a few stages first we compute any + * needed columns from the SortOrder instances that require some computation, and add them to the + * original batch. The method `appendProjectedColumns` does this. This then provides a number of + * methods that can be used to operate on a batch that has these new columns added to it. These + * include sorting, merge sorting, and finding bounds. These can be combined in various ways to + * do different algorithms. When you are done with these different operations you can drop the + * temporary columns that were added, just for computation, using `removeProjectedColumns`. + * Some times you may want to pull data back to the CPU and sort rows there too. We provide + * `cpuOrders` that lets you do this on rows that have had the extra ordering columns added to them. + * This also provides `fullySortBatch` as an optimization. If all you want to do is sort a batch + * you don't want to have to sort the temp columns too, and this provide that. + * @param sortOrder The unbound sorting order requested (Should be converted to the GPU) + * @param inputSchema The schema of the input data + */ +class GpuSorter( + sortOrder: Seq[SortOrder], + inputSchema: Array[Attribute]) extends Arm with Serializable { + + /** + * A class that provides convenience methods for sorting batches of data + * @param sortOrder The unbound sorting order requested (Should be converted to the GPU) + * @param inputSchema The schema of the input data + */ + def this(sortOrder: Seq[SortOrder], inputSchema: Seq[Attribute]) = + this(sortOrder, inputSchema.toArray) + + private[this] val boundSortOrder = GpuBindReferences.bindReferences(sortOrder, inputSchema.toSeq) + + private[this] val numInputColumns = inputSchema.length + + /** + * A sort order that the CPU can use to sort data that is the output of `appendProjectedColumns`. + * You cannot use the regular sort order directly because it has been translated to the GPU when + * computation is needed. + */ + def cpuOrdering: Seq[SortOrder] = cpuOrderingInternal.toSeq + + private[this] lazy val (sortOrdersThatNeedComputation, cudfOrdering, cpuOrderingInternal) = { + val sortOrdersThatNeedsComputation = mutable.ArrayBuffer[SortOrder]() + val cpuOrdering = mutable.ArrayBuffer[SortOrder]() + val cudfOrdering = mutable.ArrayBuffer[Table.OrderByArg]() + var newColumnIndex = numInputColumns + // Remove duplicates in the ordering itself because + // there is no need to do it twice. + boundSortOrder.distinct.foreach { so => + SortUtils.extractReference(so.child) match { + case Some(ref) => + cudfOrdering += SortUtils.getOrder(so, ref.ordinal) + // It is a bound GPU reference so we have to translate it to the CPU + cpuOrdering += ShimLoader.getSparkShims.sortOrder( + BoundReference(ref.ordinal, ref.dataType, ref.nullable), + so.direction, so.nullOrdering) + case None => + val index = newColumnIndex + newColumnIndex += 1 + cudfOrdering += SortUtils.getOrder(so, index) + sortOrdersThatNeedsComputation += so + // We already did the computation so instead of trying to translate + // the computation back to the CPU too, just use the existing columns. + cpuOrdering += ShimLoader.getSparkShims.sortOrder( + BoundReference(index, so.dataType, so.nullable), + so.direction, so.nullOrdering) + } + } + (sortOrdersThatNeedsComputation.toArray, cudfOrdering.toArray, cpuOrdering.toArray) + } + + /** + * A schema for columns that are computed as a part of sorting. The name of this field should be + * ignored as it is a temporary implementation detail. The order and types of them are what + * matter. + */ + private[this] lazy val computationSchema: Seq[Attribute] = + sortOrdersThatNeedComputation.map { so => + AttributeReference("SORT_TMP", so.dataType, so.nullable)() + } + + /** + * Some SortOrder instances require adding temporary columns which is done as a part of + * the `appendProjectedColumns` method. This is the schema for the result of that method. + */ + lazy val projectedBatchSchema: Seq[Attribute] = inputSchema ++ computationSchema + /** + * The types and order for the columns returned by `appendProjectedColumns` + */ + lazy val projectedBatchTypes: Array[DataType] = projectedBatchSchema.map(_.dataType).toArray + /** + * The original input types without any temporary columns added to them needed for sorting. + */ + lazy val originalTypes: Array[DataType] = inputSchema.map(_.dataType) + + /** + * Append any columns to the batch that need to be materialized for sorting to work. + * @param inputBatch the batch to add columns to + * @return the batch with columns added + */ + final def appendProjectedColumns(inputBatch: ColumnarBatch): ColumnarBatch = { + if (sortOrdersThatNeedComputation.isEmpty) { + GpuColumnVector.incRefCounts(inputBatch) + } else { + withResource(GpuProjectExec.project(inputBatch, + sortOrdersThatNeedComputation.map(_.child))) { extra => + GpuColumnVector.combineColumns(inputBatch, extra) + } + } + } + + /** + * Find the upper bounds on data that is the output of `appendProjectedColumns`. Be careful + * because a batch with no columns/only rows will cause errors and should be special cased. + * @param findIn the data to look in for upper bounds + * @param find the data to look for and get the upper bound for + * @return the rows where the insertions would happen. + */ + def upperBound(findIn: ColumnarBatch, find: ColumnarBatch): ColumnVector = { + withResource(GpuColumnVector.from(findIn)) { findInTbl => + withResource(GpuColumnVector.from(find)) { findTbl => + findInTbl.upperBound(findTbl, cudfOrdering: _*) + } + } + } + + /** + * Sort a batch of data that is the output of `appendProjectedColumns`. Be careful because + * a batch with no columns/only rows will cause errors and should be special cased. + * @param inputBatch the batch to sort + * @param sortTime metric for the sort time + * @return a sorted table. + */ + final def sort(inputBatch: ColumnarBatch, sortTime: GpuMetric): Table = { + withResource(new NvtxWithMetrics("sort", NvtxColor.DARK_GREEN, sortTime)) { _ => + withResource(GpuColumnVector.from(inputBatch)) { toSortTbl => + toSortTbl.orderBy(cudfOrdering: _*) + } + } + } + + /** + * Merge multiple batches together. All of these batches should be the output of + * `appendProjectedColumns` and the output of this will also be in that same format. + * @param batches the batches to sort + * @param sortTime metric for the time spent doing the merge sort + * @return the sorted data. + */ + final def mergeSort(batches: Array[ColumnarBatch], sortTime: GpuMetric): ColumnarBatch = { + withResource(new NvtxWithMetrics("merge sort", NvtxColor.DARK_GREEN, sortTime)) { _ => + if (batches.length == 1) { + GpuColumnVector.incRefCounts(batches.head) + } else { + val merged = withResource(ArrayBuffer[Table]()) { tabs => + batches.foreach { cb => + tabs += GpuColumnVector.from(cb) + } + Table.merge(tabs.toArray, cudfOrdering: _*) + } + withResource(merged) { merged => + GpuColumnVector.from(merged, projectedBatchTypes) + } + } + } + } + + /** + * Get the sort order for a batch of data that is the output of `appendProjectedColumns`. + * Be careful because a batch with no columns/only rows will cause errors and should be special + * cased. + * @param inputBatch the batch to sort + * @param sortTime metric for the sort time (really the sort order time here) + * @return a gather map column + */ + final def computeSortOrder(inputBatch: ColumnarBatch, sortTime: GpuMetric): ColumnVector = { + withResource(new NvtxWithMetrics("sort_order", NvtxColor.DARK_GREEN, sortTime)) { _ => + withResource(GpuColumnVector.from(inputBatch)) { toSortTbl => + toSortTbl.sortOrder(cudfOrdering: _*) + } + } + } + + /** + * Convert a sorted table into a ColumnarBatch and drop any columns added by + * appendProjectedColumns + * @param input the table to convert + * @return the ColumnarBatch + */ + final def removeProjectedColumns(input: Table): ColumnarBatch = + GpuColumnVector.from(input, originalTypes, 0, numInputColumns) + + /** + * Append any columns needed for sorting the batch and sort it. Be careful because + * a batch with no columns/only rows will cause errors and should be special cased. + * @param inputBatch the batch to sort + * @param sortTime metric for the sort time + * @return a sorted table. + */ + final def appendProjectedAndSort(inputBatch: ColumnarBatch, sortTime: GpuMetric): Table = { + withResource(appendProjectedColumns(inputBatch)) { toSort => + sort(toSort, sortTime) + } + } + + /** + * Sort a batch start to finish. Add any projected columns that are needed to sort, sort the + * data, and drop the added columns. + * @param inputBatch the batch to sort + * @param sortTime metric for the amount of time taken to sort. + * @param peakDevMemory metric for the peak memory usage + * @return the sorted batch + */ + final def fullySortBatch( + inputBatch: ColumnarBatch, + sortTime: GpuMetric, + peakDevMemory: GpuMetric): ColumnarBatch = { + if (inputBatch.numCols() == 0) { + // Special case + return new ColumnarBatch(Array.empty, inputBatch.numRows()) + } + + var peakMem = 0L + val sortedTbl = withResource(appendProjectedColumns(inputBatch)) { toSort => + // inputBatch is completely contained in toSort, so don't need to add it too + peakMem += GpuColumnVector.getTotalDeviceMemoryUsed(toSort) + // We are going to skip gathering the computed columns + // In cases where we don't need the computed columns again this can save some time + withResource(computeSortOrder(toSort, sortTime)) { gatherMap => + withResource(GpuColumnVector.from(inputBatch)) { inputTable => + withResource(new NvtxWithMetrics("gather", NvtxColor.DARK_GREEN, sortTime)) { _ => + inputTable.gather(gatherMap) + } + } + } + } + withResource(sortedTbl) { sortedTbl => + peakMem += GpuColumnVector.getTotalDeviceMemoryUsed(sortedTbl) + peakDevMemory.set(Math.max(peakDevMemory.value, peakMem)) + // We don't need to remove any projected columns, because they were never gathered + GpuColumnVector.from(sortedTbl, originalTypes) + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 62789eddd83..f02e9212f89 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids +import com.nvidia.spark.rapids.StorageTier.StorageTier + import org.apache.spark.sql.rapids.TempSpillBufferId import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -125,8 +127,15 @@ object SpillableColumnarBatch extends Arm { /** * Create a new SpillableColumnarBatch. * @note This takes over ownership of batch, and batch should not be used after this. + * @param batch the batch to make spillable + * @param priority the initial spill priority of this batch + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. */ - def apply(batch: ColumnarBatch, priority: Long): SpillableColumnarBatch = { + def apply(batch: ColumnarBatch, + priority: Long, + spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback) + : SpillableColumnarBatch = { val numRows = batch.numRows() if (batch.numCols() <= 0) { // We consumed it @@ -135,7 +144,7 @@ object SpillableColumnarBatch extends Arm { } else { val types = GpuColumnVector.extractTypes(batch) val id = TempSpillBufferId() - addBatch(id, batch, priority) + addBatch(id, batch, priority, spillCallback) new SpillableColumnarBatchImpl(id, numRows, types) } } @@ -143,17 +152,20 @@ object SpillableColumnarBatch extends Arm { private[this] def addBatch( id: RapidsBufferId, batch: ColumnarBatch, - initialSpillPriority: Long): Unit = { + initialSpillPriority: Long, + spillCallback: RapidsBuffer.SpillCallback): Unit = { withResource(batch) { batch => val numColumns = batch.numCols() if (GpuCompressedColumnVector.isBatchCompressed(batch)) { val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector] val buff = cv.getTableBuffer buff.incRefCount() - RapidsBufferCatalog.addBuffer(id, buff, cv.getTableMeta, initialSpillPriority) + RapidsBufferCatalog.addBuffer(id, buff, cv.getTableMeta, initialSpillPriority, + spillCallback) } else if (GpuPackedTableColumn.isBatchPacked(batch)) { val cv = batch.column(0).asInstanceOf[GpuPackedTableColumn] - RapidsBufferCatalog.addContiguousTable(id, cv.getContiguousTable, initialSpillPriority) + RapidsBufferCatalog.addContiguousTable(id, cv.getContiguousTable, initialSpillPriority, + spillCallback) } else if (numColumns > 0 && (0 until numColumns) .forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) { @@ -161,12 +173,14 @@ object SpillableColumnarBatch extends Arm { val table = GpuColumnVector.from(batch) val buff = cv.getBuffer buff.incRefCount() - RapidsBufferCatalog.addTable(id, table, buff, cv.getTableMeta, initialSpillPriority) + RapidsBufferCatalog.addTable(id, table, buff, cv.getTableMeta, initialSpillPriority, + spillCallback) } else { withResource(GpuColumnVector.from(batch)) { tmpTable => withResource(tmpTable.contiguousSplit()) { contigTables => require(contigTables.length == 1, "Unexpected number of contiguous spit tables") - RapidsBufferCatalog.addContiguousTable(id, contigTables.head, initialSpillPriority) + RapidsBufferCatalog.addContiguousTable(id, contigTables.head, initialSpillPriority, + spillCallback) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index e2df614c47f..b3b0fda3500 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -446,7 +446,9 @@ case class GpuHashAggregateExec( // batch sizes would go over a threshold, we'd spill the aggregatedCb, // and perform aggregation on the new batch (which would need to be merged, with the // spilled aggregates) - concatCvs = concatenateBatches(aggregatedInputCb, aggregatedCb, concatTime) + // Please note that in order for first/last to work properly we have to maintain + // the order of the input batches. + concatCvs = concatenateBatches(aggregatedCb, aggregatedInputCb, concatTime) aggregatedCb.close() aggregatedCb = null aggregatedInputCb.close() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index fcf55be2316..60bfdf42298 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -199,6 +199,9 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range override val output: Seq[Attribute] = range.output + override protected val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL + override protected val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL + override def outputOrdering: Seq[SortOrder] = range.outputOrdering override def outputPartitioning: Partitioning = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala index 894d8e1735c..f9a26b4b143 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala @@ -19,17 +19,15 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{NvtxColor, Table} -import ai.rapids.cudf import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} /** @@ -59,7 +57,7 @@ trait GpuBaseLimitExec extends LimitExec with GpuExec { val crdd = child.executeColumnar() crdd.mapPartitions { cbIter => new Iterator[ColumnarBatch] { - var remainingLimit = limit + private var remainingLimit = limit override def hasNext: Boolean = remainingLimit > 0 && cbIter.hasNext @@ -138,69 +136,11 @@ class GpuCollectLimitMeta( override def convertToGpu(): GpuExec = GpuGlobalLimitExec(collectLimit.limit, ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty), - GpuLocalLimitExec(collectLimit.limit, childPlans(0).convertIfNeeded()))) + GpuLocalLimitExec(collectLimit.limit, childPlans.head.convertIfNeeded()))) } object GpuTopN extends Arm { - private[this] def getOrderArgs( - sortOrder: Seq[SortOrder], - inputTbl: Table): Seq[Table.OrderByArg] = { - val orderByArgs = if (sortOrder.nonEmpty) { - sortOrder.zipWithIndex.map { case (order, index) => - if (order.isAscending) { - Table.asc(index, order.nullOrdering == NullsFirst) - } else { - Table.desc(index, order.nullOrdering == NullsLast) - } - } - } else { - (0 until inputTbl.getNumberOfColumns).map { index => - Table.asc(index, true) - } - } - orderByArgs - } - - private def doGpuSort( - inputTbl: Table, - sortOrder: Seq[SortOrder], - types: Seq[DataType]): ColumnarBatch = { - val orderByArgs = getOrderArgs(sortOrder, inputTbl) - val numSortCols = sortOrder.length - withResource(inputTbl.orderBy(orderByArgs: _*)) { resultTbl => - GpuColumnVector.from(resultTbl, types.toArray, numSortCols, resultTbl.getNumberOfColumns) - } - } - - private[this] def sortBatch( - sortOrder: Seq[SortOrder], - inputBatch: ColumnarBatch, - sortTime: GpuMetric): ColumnarBatch = { - withResource(new NvtxWithMetrics("sort", NvtxColor.DARK_GREEN, sortTime)) { _ => - var outputTypes: Seq[DataType] = Nil - var inputTbl: Table = null - var inputCvs: Seq[GpuColumnVector] = Nil - try { - if (sortOrder.nonEmpty) { - inputCvs = SortUtils.getGpuColVectorsAndBindReferences(inputBatch, sortOrder) - inputTbl = new cudf.Table(inputCvs.map(_.getBase): _*) - outputTypes = sortOrder.map(_.child.dataType) ++ - GpuColumnVector.extractTypes(inputBatch) - } else if (inputBatch.numCols() > 0) { - inputTbl = GpuColumnVector.from(inputBatch) - outputTypes = GpuColumnVector.extractTypes(inputBatch) - } - doGpuSort(inputTbl, sortOrder, outputTypes) - } finally { - inputCvs.safeClose() - if (inputTbl != null) { - inputTbl.close() - } - } - } - } - private[this] def concatAndClose(a: SpillableColumnarBatch, b: ColumnarBatch, concatTime: GpuMetric): ColumnarBatch = { @@ -237,16 +177,16 @@ object GpuTopN extends Arm { } def apply(limit: Int, - sortOrder: Seq[SortOrder], + sorter: GpuSorter, batch: ColumnarBatch, sortTime: GpuMetric): ColumnarBatch = { - withResource(sortBatch(sortOrder, batch, sortTime)) { sorted => + withResource(sorter.fullySortBatch(batch, sortTime, NoopMetric)) { sorted => takeN(sorted, limit) } } def apply(limit: Int, - sortOrder: Seq[SortOrder], + sorter: GpuSorter, iter: Iterator[ColumnarBatch], totalTime: GpuMetric, sortTime: GpuMetric, @@ -271,22 +211,22 @@ object GpuTopN extends Arm { val runningResult = if (pending.isEmpty) { withResource(input) { input => - apply(limit, sortOrder, input, sortTime) + apply(limit, sorter, input, sortTime) } } else if (totalSize > Integer.MAX_VALUE) { // The intermediate size is likely big enough we don't want to risk an overflow, // so sort/slice before we concat and sort/slice again. val tmp = withResource(input) { input => - apply(limit, sortOrder, input, sortTime) + apply(limit, sorter, input, sortTime) } withResource(concatAndClose(pending.get, tmp, concatTime)) { concat => - apply(limit, sortOrder, concat, sortTime) + apply(limit, sorter, concat, sortTime) } } else { // The intermediate size looks like we could never overflow the indexes so // do it the more efficient way and concat first followed by the sort/slice withResource(concatAndClose(pending.get, input, concatTime)) { concat => - apply(limit, sortOrder, concat, sortTime) + apply(limit, sorter, concat, sortTime) } } pending = @@ -330,7 +270,7 @@ case class GpuTopN( ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val boundSortExprs = GpuBindReferences.bindReferences(sortOrder, child.output) + val sorter = new GpuSorter(sortOrder, child.output) val boundProjectExprs = GpuBindReferences.bindGpuReferences(projectList, child.output) val totalTime = gpuLongMetric(TOTAL_TIME) val inputBatches = gpuLongMetric(NUM_INPUT_BATCHES) @@ -340,7 +280,7 @@ case class GpuTopN( val sortTime = gpuLongMetric(SORT_TIME) val concatTime = gpuLongMetric(CONCAT_TIME) child.executeColumnar().mapPartitions { iter => - val topN = GpuTopN(limit, boundSortExprs, iter, totalTime, sortTime, concatTime, + val topN = GpuTopN(limit, sorter, iter, totalTime, sortTime, concatTime, inputBatches, inputRows, outputBatches, outputRows) if (projectList != child.output) { topN.map { batch => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index dede47a3797..111326df953 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,10 +200,16 @@ object GpuFileFormatWriter extends Logging { val orderingExpr = GpuBindReferences.bindReferences( requiredOrdering .map(attr => sparkShims.sortOrder(attr, Ascending)), outputSpec.outputColumns) + val sortType = if (RapidsConf.STABLE_SORT.get(plan.conf)) { + FullSortSingleBatch + } else { + OutOfCoreSort + } GpuSortExec( orderingExpr, global = false, - child = empty2NullPlan).executeColumnar() + child = empty2NullPlan, + sortType = sortType).executeColumnar() } // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala index e9fedbde46d..d43c433c577 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} @@ -176,11 +176,14 @@ object GpuShuffleExchangeExec { * task when indeterminate tasks re-run. */ val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) { - val sorter = new GpuColumnarBatchSorter(Seq.empty[SortOrder], - null, false, false) + val shim = ShimLoader.getSparkShims + val boundReferences = outputAttributes.zipWithIndex.map { case (attr, index) => + shim.sortOrder(GpuBoundReference(index, attr.dataType, attr.nullable), Ascending) + // Force the sequence to materialize so we don't have issues with serializing too much + }.toArray.toSeq + val sorter = new GpuSorter(boundReferences, outputAttributes) rdd.mapPartitions { cbIter => - val sortedIterator = sorter.sort(cbIter) - sortedIterator + GpuSortEachBatchIterator(cbIter, sorter) } } else { rdd diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index db978716c64..23034d4181f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -23,6 +23,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, IdentityBroadcastMode} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode @@ -40,6 +41,8 @@ object TrampolineUtil { def structTypeMerge(left: DataType, right: DataType): DataType = StructType.merge(left, right) + def fromAttributes(attrs: Seq[Attribute]): StructType = StructType.fromAttributes(attrs) + def jsonValue(dataType: DataType): JsonAST.JValue = dataType.jsonValue /** Get a human-readable string, e.g.: "4.0 MiB", for a value in bytes. */ diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index f9a1fc20254..338d36f6e06 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -68,14 +68,14 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "1") .set(RapidsConf.GPU_BATCH_SIZE_BYTES.key, "1") + .set(RapidsConf.STABLE_SORT.key, "true") .set("spark.sql.shuffle.partitions", "1") withGpuSparkSession(spark => { val df = longsCsvDf(spark) - // currently, GpuSortExec requires a single batch but this is likely to change in the - // future, making this test invalid + // GpuSortExec requires a single batch if out of core sore is disabled. val df2 = df .sort(df.col("longs")) @@ -283,6 +283,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { // a query stage that runs on the CPU, wrapped in a CPU Exchange, with a ColumnarToRow // transition inserted .set("spark.sql.adaptive.enabled", "false") + // Disable out of core sort so a single batch is required + .set(RapidsConf.STABLE_SORT.key, "true") val dir = Files.createTempDirectory("spark-rapids-test").toFile val path = new File(dir, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala index dc92c07e825..aff94f9707d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala @@ -58,10 +58,9 @@ class HashSortOptimizeSuite extends FunSuite { val gse = sortNode.get.asInstanceOf[GpuSortExec] assert(gse.children.length == 1) assert(gse.global == false) - assert(gse.coalesceGoal.isInstanceOf[TargetSize]) + assert(gse.sortType == SortEachBatch) val sortChild = gse.children.head - assert(sortChild.isInstanceOf[GpuCoalesceBatches]) - assertResult(joinNode) { sortChild.children.head } + assertResult(joinNode) { sortChild } } test("sort inserted after broadcast hash join") { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 6ba766436e9..5159166f25c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -201,7 +201,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { } class MockSpillStore(catalog: RapidsBufferCatalog) - extends RapidsBufferStore("mock spill store", catalog) { + extends RapidsBufferStore(StorageTier.HOST, catalog) { val spilledBuffers = new ArrayBuffer[RapidsBufferId] override protected def createBuffer(b: RapidsBuffer, s: Cuda.Stream): RapidsBufferBase = { @@ -210,7 +210,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { } class MockRapidsBuffer(id: RapidsBufferId, size: Long, meta: TableMeta, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) { + extends RapidsBufferBase(id, size, meta, spillPriority, RapidsBuffer.defaultSpillCallback) { override protected def releaseResources(): Unit = {} override val storageTier: StorageTier = StorageTier.HOST diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala index e8b2c8282f9..13e27c9355e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala @@ -238,7 +238,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { verify(client, times(1)).track(any[DeviceMemoryBuffer](), tmCaptor.capture()) verifyTableMeta(tableMeta, tmCaptor.getValue.asInstanceOf[TableMeta]) verify(mockStorage, times(1)) - .addBuffer(any(), dmbCaptor.capture(), any(), any()) + .addBuffer(any(), dmbCaptor.capture(), any(), any(), any()) val receivedBuff = dmbCaptor.getValue.asInstanceOf[DeviceMemoryBuffer] assertResult(tableMeta.bufferMeta().size())(receivedBuff.getLength) @@ -297,7 +297,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { verify(client, times(1)).track(any[DeviceMemoryBuffer](), tmCaptor.capture()) verifyTableMeta(tableMeta, tmCaptor.getValue.asInstanceOf[TableMeta]) verify(mockStorage, times(1)) - .addBuffer(any(), dmbCaptor.capture(), any(), any()) + .addBuffer(any(), dmbCaptor.capture(), any(), any(), any()) verify(mockCatalog, times(1)).removeBuffer(any()) val receivedBuff = dmbCaptor.getValue.asInstanceOf[DeviceMemoryBuffer] @@ -359,7 +359,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { } verify(mockStorage, times(5)) - .addBuffer(any(), dmbCaptor.capture(), any(), any()) + .addBuffer(any(), dmbCaptor.capture(), any(), any(), any()) assertResult(totalExpectedSize)( dmbCaptor.getAllValues().toArray().map(_.asInstanceOf[DeviceMemoryBuffer].getLength).sum) @@ -421,7 +421,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { } verify(mockStorage, times(20)) - .addBuffer(any(), dmbCaptor.capture(), any(), any()) + .addBuffer(any(), dmbCaptor.capture(), any(), any(), any()) assertResult(totalExpectedSize)( dmbCaptor.getAllValues().toArray().map(_.asInstanceOf[DeviceMemoryBuffer].getLength).sum) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala index e39da9276da..9871ad2acb5 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala @@ -138,7 +138,7 @@ class RapidsShuffleTestHelper extends FunSuite testMetricsUpdater = spy(new TestShuffleMetricsUpdater) val dmbCaptor = ArgumentCaptor.forClass(classOf[DeviceMemoryBuffer]) - when(mockStorage.addBuffer(any(), dmbCaptor.capture(), any(), any())).thenAnswer(_ => { + when(mockStorage.addBuffer(any(), dmbCaptor.capture(), any(), any(), any())).thenAnswer(_ => { buffersToClose.append(dmbCaptor.getValue.asInstanceOf[DeviceMemoryBuffer]) }) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index 1fa92e48e8b..ff31aef0860 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -52,5 +52,6 @@ class SpillableColumnarBatchSuite extends FunSuite with Arm { override def setSpillPriority(priority: Long): Unit = {} override def close(): Unit = {} override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = null + override val spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback } }