Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved GpuArrowEvalPythonExec #783

Merged
merged 2 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and Python process. It also supports running the Python UDFs code on the GPU when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
Expand Down
1 change: 0 additions & 1 deletion integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


_conf = {
'spark.rapids.sql.exec.ArrowEvalPythonExec':'true',
'spark.rapids.sql.exec.MapInPandasExec':'true',
'spark.rapids.sql.exec.FlatMapGroupsInPandasExec': 'true',
'spark.rapids.sql.exec.AggregateInPandasExec': 'true',
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
except Exception as e:
pytestmark = pytest.mark.skip(reason=str(e))

arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.ArrowEvalPythonExec': 'true'}
arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true'}

####################################################################
# NOTE: pytest does not play well with pyspark udfs, because pyspark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) {
return vectors;
}

public static final GpuColumnVector[] extractColumns(Table table) {
try (ColumnarBatch batch = from(table)) {
return extractColumns(batch);
}
}

private final ai.rapids.cudf.ColumnVector cudfCv;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.SpillPriorities.COALESCE_BATCH_ON_DECK_PRIORITY

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -406,7 +405,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
}

override def addBatchToConcat(batch: ColumnarBatch): Unit =
batches.append(SpillableColumnarBatch(batch, SpillPriorities.COALESCE_BATCH_PRIORITY))
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY))

override def getColumnSizes(cb: ColumnarBatch): Array[Long] = {
if (!GpuCompressedColumnVector.isBatchCompressed(cb)) {
Expand Down Expand Up @@ -474,7 +473,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, COALESCE_BATCH_ON_DECK_PRIORITY))
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}

override protected def clearOnDeck(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1718,9 +1718,9 @@ object GpuOverrides {
GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded())
}),
exec[ArrowEvalPythonExec](
"The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU" +
" when calling cuDF APIs in the UDF, also accelerates the data transfer between the" +
" Java process and Python process",
"The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" +
" Java process and Python process. It also supports running the Python UDFs code on" +
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs some work and a reference to how to enable/configure the cudf on GPU feature, but I am nto sure what to say because we are not 100% ready to promote that yet.

" the GPU when enabled",
(e, conf, p, r) =>
new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) {
val udfs: Seq[BaseExprMeta[PythonUDF]] =
Expand All @@ -1738,7 +1738,7 @@ object GpuOverrides {
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
e.evalType)
}).disabledByDefault("Performance is not ideal for UDFs that take a long time"),
}),
exec[GlobalLimitExec](
"Limiting of results across partitions",
(globalLimitExec, conf, p, r) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ object SpillPriorities {
val INPUT_FROM_SHUFFLE_PRIORITY: Long = Long.MaxValue - 1000

/**
* Priority for buffers in coalesce batch that did not fit into the batch we are working on.
* Most of the time this is shuffle input data that we read early so it should be slightly higher
* priority to keep around than other input shuffle buffers.
* Priority for buffers that are waiting for next to be called. i.e. data held between
* calls to `hasNext` and `next` or between different calls to `next`.
*/
val COALESCE_BATCH_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1
val ACTIVE_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1

/**
* Priority for buffers in coalesce batch that are being held before the coalesce.
* Priority for multiple buffers being buffered within a call to next.
*/
val COALESCE_BATCH_PRIORITY: Long = COALESCE_BATCH_ON_DECK_PRIORITY + 100
val ACTIVE_BATCHING_PRIORITY: Long = ACTIVE_ON_DECK_PRIORITY + 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ object SpillableColumnarBatch extends Arm {
(0 until numColumns)
.forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) {
val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer]
withResource(GpuColumnVector.from(batch)) { table =>
RapidsBufferCatalog.addTable(id, table, cv.getBuffer, initialSpillPriority)
withResource(batch) { batch =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val table = GpuColumnVector.from(batch)
val buff = cv.getBuffer
buff.incRefCount()
RapidsBufferCatalog.addTable(id, table, buff, initialSpillPriority)
}
} else {
withResource(batch) { batch =>
Expand Down
Loading