Skip to content

Commit

Permalink
Add in out of core sort (NVIDIA#1719)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Feb 22, 2021
1 parent 1bbf094 commit 4687e2c
Show file tree
Hide file tree
Showing 33 changed files with 1,071 additions and 334 deletions.
13 changes: 13 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ Spark's guarantee. It may not be 100% identical if the ordering is ambiguous.
In versions of Spark prior to 3.1.0 `-0.0` is always < `0.0` but in 3.1.0 and above this is
not true for sorting. For all versions of the plugin `-0.0` == `0.0` for sorting.

Spark's sorting is typically a [stable](https://en.wikipedia.org/wiki/Sorting_algorithm#Stability)
sort. Sort stability cannot be guaranteed in distributed work loads because the order in which
upstream data arrives to a task is not guaranteed. Sort stability is only
guaranteed in one situation which is reading and sorting data from a file using a single
task/partition. The RAPIDS Accelerator does an unstable
[out of core](https://en.wikipedia.org/wiki/External_memory_algorithm) sort by default. This
simply means that the sort algorithm allows for spilling parts of the data if it is larger than
can fit in the GPU's memory, but it does not guarantee ordering of rows when the ordering of the
keys is ambiguous. If you do rely on a stable sort in your processing you can request this by
setting [spark.rapids.sql.stableSort.enabled](configs.md#sql.stableSort.enabled) to `true` and
RAPIDS will try to sort all the data for a given task/partition at once on the GPU. This may change
in the future to allow for a spillable stable sort.

## Floating Point

For most basic floating-point operations like addition, subtraction, multiplication, and division
Expand Down
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Name | Description | Default Value
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
<a name="sql.shuffle.spillThreads"></a>spark.rapids.sql.shuffle.spillThreads|Number of threads used to spill shuffle data to disk in the background.|6
<a name="sql.stableSort.enabled"></a>spark.rapids.sql.stableSort.enabled|Enable or disable stable sorting. Apache Spark's sorting is typically a stable sort, but sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability then only matters when reading and sorting data from a file using a single task/partition. Because of limitations in the plugin when you enable stable sorting all of the data for a single task will be combined into a single batch before sorting. This currently disables spilling from GPU memory if the data size is too large.|false
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false
<a name="sql.variableFloatAgg.enabled"></a>spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|false

Expand Down
7 changes: 3 additions & 4 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -384,9 +384,8 @@ def test_generic_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
# Coalesce and sort are to make sure that first and last, which are non-deterministic
# become deterministic
lambda spark : binary_op_df(spark, data_gen)\
.coalesce(1)\
.sortWithinPartitions('b').selectExpr(
lambda spark : unary_op_df(spark, data_gen)\
.coalesce(1).selectExpr(
'min(a)',
'max(a)',
'first(a)',
Expand Down
145 changes: 131 additions & 14 deletions sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

/**
Expand Down Expand Up @@ -69,6 +71,14 @@ public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector c
}
}

private static String hexString(byte[] bytes) {
StringBuilder str = new StringBuilder();
for (byte b : bytes) {
str.append(String.format("%02x", b&0xff));
}
return str.toString();
}

/**
* Print to standard error the contents of a column. Note that this should never be
* called from production code, as it is very slow. Also note that this is not production
Expand All @@ -88,6 +98,48 @@ public static synchronized void debug(String name, HostColumnVector hostCol) {
System.err.println(i + " " + hostCol.getBigDecimal(i));
}
}
} else if (DType.STRING.equals(type)) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
} else {
System.err.println(i + " \"" + hostCol.getJavaString(i) + "\" " +
hexString(hostCol.getUTF8(i)));
}
}
} else if (DType.INT32.equals(type)) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
} else {
System.err.println(i + " " + hostCol.getInt(i));
}
}
} else if (DType.INT8.equals(type)) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
} else {
System.err.println(i + " " + hostCol.getByte(i));
}
}
} else if (DType.BOOL8.equals(type)) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
} else {
System.err.println(i + " " + hostCol.getBoolean(i));
}
}
} else if (DType.TIMESTAMP_MICROSECONDS.equals(type) ||
DType.INT64.equals(type)) {
for (int i = 0; i < hostCol.getRowCount(); i++) {
if (hostCol.isNull(i)) {
System.err.println(i + " NULL");
} else {
System.err.println(i + " " + hostCol.getLong(i));
}
}
} else {
System.err.println("TYPE " + type + " NOT SUPPORTED FOR DEBUG PRINT");
}
Expand Down Expand Up @@ -520,6 +572,17 @@ private static boolean typeConversionAllowed(ColumnView cv, DataType colType) {
}
}

static boolean typeConversionAllowed(Table table, DataType[] colTypes, int startCol, int endCol) {
final int numColumns = endCol - startCol;
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
boolean ret = true;
for (int colIndex = startCol; colIndex < endCol; colIndex++) {
boolean t = typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex - startCol]);
ret = ret && t;
}
return ret;
}

/**
* This should only ever be called from an assertion. This is to avoid the performance overhead
* of doing the complicated check in production. Sadly this means that we don't get to give a
Expand All @@ -528,9 +591,7 @@ private static boolean typeConversionAllowed(ColumnView cv, DataType colType) {
*/
static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
final int numColumns = table.getNumberOfColumns();
if (numColumns != colTypes.length) {
return false;
}
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
boolean ret = true;
for (int colIndex = 0; colIndex < numColumns; colIndex++) {
ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]);
Expand All @@ -545,22 +606,24 @@ static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
* both the table that is passed in and the batch returned to be sure that there are no leaks.
*
* @param table a table of vectors
* @param colTypes List of the column data types in the table passed in
* @param colTypes List of the column data types for the returned batch. It matches startColIndex
* to untilColIndex instead of everything in table.
* @param startColIndex index of the first vector you want in the final ColumnarBatch
* @param untilColIndex until index of the columns. (ie doesn't include that column num)
* @return a ColumnarBatch of the vectors from the table
*/
public static ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table +
" to " + Arrays.toString(colTypes);
assert typeConversionAllowed(table, colTypes, startColIndex, untilColIndex) :
"Type conversion is not allowed from " + table + " to " + Arrays.toString(colTypes) +
" columns " + startColIndex + " to " + untilColIndex;
int numColumns = untilColIndex - startColIndex;
ColumnVector[] columns = new ColumnVector[numColumns];
int finalLoc = 0;
boolean success = false;
try {
for (int i = startColIndex; i < untilColIndex; i++) {
columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes[i]);
columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes[i - startColIndex]);
finalLoc++;
}
long rows = table.getRowCount();
Expand Down Expand Up @@ -625,6 +688,44 @@ public static ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) {
return vectors;
}

/**
* Increment the reference count for all columns in the input batch.
*/
public static ColumnarBatch incRefCounts(ColumnarBatch batch) {
for (ai.rapids.cudf.ColumnVector cv: extractBases(batch)) {
cv.incRefCount();
}
return batch;
}

/**
* Take the columns from all of the batches passed in and put them in a single batch. The order
* of the columns is preserved.
* <br/>
* For example if we had <pre>combineColumns({A, B}, {C, D})</pre> The result would be a single
* batch with <pre>{A, B, C, D}</pre>
*/
public static ColumnarBatch combineColumns(ColumnarBatch ... batches) {
boolean isFirst = true;
int numRows = 0;
ArrayList<ColumnVector> columns = new ArrayList<>();
for (ColumnarBatch cb: batches) {
if (isFirst) {
numRows = cb.numRows();
isFirst = false;
} else {
assert cb.numRows() == numRows : "Rows do not match expected " + numRows + " found " +
cb.numRows();
}
int numColumns = cb.numCols();
for (int i = 0; i < numColumns; i++) {
columns.add(cb.column(i));
}
}
ColumnarBatch ret = new ColumnarBatch(columns.toArray(new ColumnVector[columns.size()]), numRows);
return incRefCounts(ret);
}

/**
* Get the underlying Spark compatible columns from the batch. This does not increment any
* reference counts so if you want to use these columns after the batch is closed
Expand Down Expand Up @@ -706,27 +807,43 @@ public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
WithTableBuffer wtb = (WithTableBuffer) batch.column(0);
sum += wtb.getTableBuffer().getLength();
} else {
HashSet<Long> found = new HashSet<>();
for (int i = 0; i < batch.numCols(); i++) {
sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize();
ai.rapids.cudf.ColumnVector cv = ((GpuColumnVector)batch.column(i)).getBase();
long id = cv.getNativeView();
if (found.add(id)) {
sum += cv.getDeviceMemorySize();
}
}
}
}
return sum;
}

public static long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) {
public static long getTotalDeviceMemoryUsed(GpuColumnVector[] vectors) {
long sum = 0;
for (int i = 0; i < cv.length; i++){
sum += cv[i].getBase().getDeviceMemorySize();
HashSet<Long> found = new HashSet<>();
for (int i = 0; i < vectors.length; i++) {
ai.rapids.cudf.ColumnVector cv = vectors[i].getBase();
long id = cv.getNativeView();
if (found.add(id)) {
sum += cv.getDeviceMemorySize();
}
}
return sum;
}

public static long getTotalDeviceMemoryUsed(Table tb) {
public static long getTotalDeviceMemoryUsed(Table table) {
long sum = 0;
int len = tb.getNumberOfColumns();
int len = table.getNumberOfColumns();
// Deduplicate columns that are the same
HashSet<Long> found = new HashSet<>();
for (int i = 0; i < len; i++) {
sum += tb.getColumn(i).getDeviceMemorySize();
ai.rapids.cudf.ColumnVector cv = table.getColumn(i);
long id = cv.getNativeView();
if (found.add(id)) {
sum += cv.getDeviceMemorySize();
}
}
return sum;
}
Expand Down
11 changes: 10 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,15 @@ trait Arm {
}
}

/** Executes the provided code block and then closes the array buffer of resources */
def withResource[T <: AutoCloseable, V](r: ArrayBuffer[T])(block: ArrayBuffer[T] => V): V = {
try {
block(r)
} finally {
r.safeClose()
}
}

/** Executes the provided code block and then closes the value if it is AutoCloseable */
def withResourceIfAllowed[T, V](r: T)(block: T => V): V = {
try {
Expand Down
40 changes: 38 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.nvidia.spark.rapids

import org.apache.spark.SparkContext
import com.nvidia.spark.rapids.StorageTier.{DEVICE, DISK, GDS, HOST, StorageTier}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -35,7 +37,7 @@ object MetricsLevel {
}
}

object GpuMetric {
object GpuMetric extends Logging {
// Metric names.
val BUFFER_TIME = "bufferTime"
val GPU_DECODE_TIME = "gpuDecodeTime"
Expand All @@ -57,6 +59,9 @@ object GpuMetric {
val BUILD_DATA_SIZE = "buildDataSize"
val BUILD_TIME = "buildTime"
val STREAM_TIME = "streamTime"
val SPILL_AMOUNT = "spillData"
val SPILL_AMOUNT_DISK = "spillDisk"
val SPILL_AMOUNT_HOST = "spillHost"

// Metric Descriptions.
val DESCRIPTION_BUFFER_TIME = "buffer time"
Expand All @@ -79,6 +84,9 @@ object GpuMetric {
val DESCRIPTION_BUILD_DATA_SIZE = "build side size"
val DESCRIPTION_BUILD_TIME = "build time"
val DESCRIPTION_STREAM_TIME = "stream time"
val DESCRIPTION_SPILL_AMOUNT = "bytes spilled from GPU"
val DESCRIPTION_SPILL_AMOUNT_DISK = "bytes spilled to disk"
val DESCRIPTION_SPILL_AMOUNT_HOST = "bytes spilled to host"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
Expand All @@ -103,6 +111,28 @@ object GpuMetric {
object DEBUG_LEVEL extends MetricsLevel(0)
object MODERATE_LEVEL extends MetricsLevel(1)
object ESSENTIAL_LEVEL extends MetricsLevel(2)

def makeSpillCallback(allMetrics: Map[String, GpuMetric]): RapidsBuffer.SpillCallback = {
val spillAmount = allMetrics(SPILL_AMOUNT)
val disk = allMetrics(SPILL_AMOUNT_DISK)
val host = allMetrics(SPILL_AMOUNT_HOST)
def updateMetrics(from: StorageTier, to: StorageTier, amount: Long): Unit = {
from match {
case DEVICE =>
spillAmount += amount
case _ => // ignored
}
to match {
case HOST =>
host += amount
case GDS | DISK =>
disk += amount
case _ =>
logWarning(s"Spill to $to is unsupported in metrics: $amount")
}
}
updateMetrics
}
}

sealed abstract class GpuMetric extends Serializable {
Expand Down Expand Up @@ -199,6 +229,12 @@ trait GpuExec extends SparkPlan with Arm {

lazy val additionalMetrics: Map[String, GpuMetric] = Map.empty

protected def spillMetrics: Map[String, GpuMetric] = Map(
SPILL_AMOUNT -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_SPILL_AMOUNT),
SPILL_AMOUNT_DISK -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_SPILL_AMOUNT_DISK),
SPILL_AMOUNT_HOST -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_SPILL_AMOUNT_HOST)
)

/**
* Returns true if there is something in the exec that cannot work when batches between
* multiple file partitions are combined into a single batch (coalesce).
Expand Down
Loading

0 comments on commit 4687e2c

Please sign in to comment.