Skip to content

Commit

Permalink
Improved GpuArrowEvalPythonExec (NVIDIA#783)
Browse files Browse the repository at this point in the history
* Improved GpuArrowEvalPythonExec

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 17, 2020
1 parent 3c0a2cf commit 985589a
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 102 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and Python process. It also supports running the Python UDFs code on the GPU when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
Expand Down
1 change: 0 additions & 1 deletion integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


_conf = {
'spark.rapids.sql.exec.ArrowEvalPythonExec':'true',
'spark.rapids.sql.exec.MapInPandasExec':'true',
'spark.rapids.sql.exec.FlatMapGroupsInPandasExec': 'true',
'spark.rapids.sql.exec.AggregateInPandasExec': 'true',
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
except Exception as e:
pytestmark = pytest.mark.skip(reason=str(e))

arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.ArrowEvalPythonExec': 'true'}
arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true'}

####################################################################
# NOTE: pytest does not play well with pyspark udfs, because pyspark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) {
return vectors;
}

public static final GpuColumnVector[] extractColumns(Table table) {
try (ColumnarBatch batch = from(table)) {
return extractColumns(batch);
}
}

private final ai.rapids.cudf.ColumnVector cudfCv;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.SpillPriorities.COALESCE_BATCH_ON_DECK_PRIORITY

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -406,7 +405,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
}

override def addBatchToConcat(batch: ColumnarBatch): Unit =
batches.append(SpillableColumnarBatch(batch, SpillPriorities.COALESCE_BATCH_PRIORITY))
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY))

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!GpuCompressedColumnVector.isBatchCompressed(cb)) {
Expand Down Expand Up @@ -474,7 +473,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, COALESCE_BATCH_ON_DECK_PRIORITY))
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}

override protected def clearOnDeck(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1718,9 +1718,9 @@ object GpuOverrides {
GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded())
}),
exec[ArrowEvalPythonExec](
"The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU" +
" when calling cuDF APIs in the UDF, also accelerates the data transfer between the" +
" Java process and Python process",
"The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" +
" Java process and Python process. It also supports running the Python UDFs code on" +
" the GPU when enabled",
(e, conf, p, r) =>
new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) {
val udfs: Seq[BaseExprMeta[PythonUDF]] =
Expand All @@ -1738,7 +1738,7 @@ object GpuOverrides {
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
e.evalType)
}).disabledByDefault("Performance is not ideal for UDFs that take a long time"),
}),
exec[GlobalLimitExec](
"Limiting of results across partitions",
(globalLimitExec, conf, p, r) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ object SpillPriorities {
val INPUT_FROM_SHUFFLE_PRIORITY: Long = Long.MaxValue - 1000

/**
* Priority for buffers in coalesce batch that did not fit into the batch we are working on.
* Most of the time this is shuffle input data that we read early so it should be slightly higher
* priority to keep around than other input shuffle buffers.
* Priority for buffers that are waiting for next to be called. i.e. data held between
* calls to `hasNext` and `next` or between different calls to `next`.
*/
val COALESCE_BATCH_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1
val ACTIVE_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1

/**
* Priority for buffers in coalesce batch that are being held before the coalesce.
* Priority for multiple buffers being buffered within a call to next.
*/
val COALESCE_BATCH_PRIORITY: Long = COALESCE_BATCH_ON_DECK_PRIORITY + 100
val ACTIVE_BATCHING_PRIORITY: Long = ACTIVE_ON_DECK_PRIORITY + 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,21 @@ object SpillableColumnarBatch extends Arm {
initialSpillPriority: Long): Unit = {
val numColumns = batch.numCols()
if (GpuCompressedColumnVector.isBatchCompressed(batch)) {
val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector]
RapidsBufferCatalog.addBuffer(id, cv.getBuffer, cv.getTableMeta, initialSpillPriority)
withResource(batch) { batch =>
val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector]
val buff = cv.getBuffer
buff.incRefCount()
RapidsBufferCatalog.addBuffer(id, buff, cv.getTableMeta, initialSpillPriority)
}
} else if (numColumns > 0 &&
(0 until numColumns)
.forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) {
val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer]
withResource(GpuColumnVector.from(batch)) { table =>
RapidsBufferCatalog.addTable(id, table, cv.getBuffer, initialSpillPriority)
withResource(batch) { batch =>
val table = GpuColumnVector.from(batch)
val buff = cv.getBuffer
buff.incRefCount()
RapidsBufferCatalog.addTable(id, table, buff, initialSpillPriority)
}
} else {
withResource(batch) { batch =>
Expand Down
Loading

0 comments on commit 985589a

Please sign in to comment.