Skip to content

Commit

Permalink
Accelerate data transfer for map Pandas UDF plan (#2035)
Browse files Browse the repository at this point in the history
* Accelerate data transfer for map Pandas UDF node.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Apr 8, 2021
1 parent 7bdcc15 commit 0f9b30e
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 85 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.|false|This is disabled by default because it only supports row based frame for now|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

Expand Down
55 changes: 39 additions & 16 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,29 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td>MapInPandasExec</td>
<td>The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.</td>
<td>None</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>WindowInPandasExec</td>
<td>The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.</td>
<td>This is disabled by default because it only supports row based frame for now</td>
Expand Down Expand Up @@ -10661,11 +10684,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">reduction</td>
Expand Down Expand Up @@ -10704,11 +10727,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">window</td>
Expand Down Expand Up @@ -10747,11 +10770,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">project</td>
Expand Down Expand Up @@ -10790,11 +10813,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="4">Quarter</td>
Expand Down
2 changes: 0 additions & 2 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
'spark.rapids.sql.exec.FlatMapGroupsInPandasExec': 'true',
'spark.rapids.sql.exec.AggregateInPandasExec': 'true',
'spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec': 'true',
Expand Down Expand Up @@ -156,7 +155,6 @@ def gpu_run(spark):


# ======= Test Flat Map In Pandas =======
@allow_non_gpu('GpuMapInPandasExec','PythonUDF')
@cudf_udf
def test_map_in_pandas(enable_cudf_udf):
def cpu_run(spark):
Expand Down
38 changes: 37 additions & 1 deletion integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def pandas_add(data):
conf=arrow_udf_conf)


@allow_non_gpu('MapInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', [LongGen()], ids=idfn)
def test_map_apply_udf(data_gen):
def pandas_filter(iterator):
Expand All @@ -214,6 +213,43 @@ def pandas_filter(iterator):
conf=arrow_udf_conf)


@pytest.mark.parametrize('data_gen', data_gens_nested_for_udf, ids=idfn)
def test_pandas_map_udf_nested_type(data_gen):
# Supported UDF output types by plugin: (commonCudfTypes + ARRAY).nested() + STRUCT
# STRUCT represents the whole dataframe in Map Pandas UDF, so no struct column in UDF output.
# More details is here
# https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L119
udf_out_schema = 'c_integral long,' \
'c_string string,' \
'c_fp double,' \
'c_bool boolean,' \
'c_date date,' \
'c_time timestamp,' \
'c_array_array array<array<long>>,' \
'c_array_string array<string>'

def col_types_udf(pdf_itr):
for pdf in pdf_itr:
# Return a data frame with columns of supported type, and there is only one row.
# The values can not be generated randomly because it should return the same data
# for both CPU and GPU runs.
yield pd.DataFrame({
"c_integral": [len(pdf)],
"c_string": ["size" + str(len(pdf))],
"c_fp": [float(len(pdf))],
"c_bool": [False],
"c_date": [date(2021, 4, 2)],
"c_time": [datetime(2021, 4, 2, tzinfo=timezone.utc)],
"c_array_array": [[[len(pdf)]]],
"c_array_string": [["size" + str(len(pdf))]]
})

assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen)\
.mapInPandas(col_types_udf, schema=udf_out_schema),
conf=arrow_udf_conf)


def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,8 +2008,15 @@ object GpuOverrides {
"UDF run in an external python process. Does not actually run on the GPU, but " +
"the transfer of data to/from it can be accelerated.",
ExprChecks.fullAggAndProject(
TypeSig.commonCudfTypes + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes),
TypeSig.all,
// Different types of Pandas UDF support different sets of output type. Please refer to
// https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L98
// for more details.
// It is impossible to specify the exact type signature for each Pandas UDF type in a single
// expression 'PythonUDF'.
// So use the 'unionOfPandasUdfOut' to cover all types for Spark. The type signature of
// plugin is also an union of all the types of Pandas UDF.
(TypeSig.commonCudfTypes + TypeSig.ARRAY).nested() + TypeSig.STRUCT,
TypeSig.unionOfPandasUdfOut,
repeatingParamCheck = Some(RepeatingParamCheck(
"param",
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
Expand Down Expand Up @@ -2834,11 +2841,12 @@ object GpuOverrides {
}
}),
exec[MapInPandasExec](
"The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports " +
" scheduling GPU resources for the Python process when enabled",
ExecChecks.hiddenHack(),
(mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
"The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r)),
exec[FlatMapGroupsInPandasExec](
"The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports " +
" scheduling GPU resources for the Python process when enabled",
Expand Down
13 changes: 13 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ object TypeSig {
val orderable: TypeSig = (BOOLEAN + BYTE + SHORT + INT + LONG + FLOAT + DOUBLE + DATE +
TIMESTAMP + STRING + DECIMAL + NULL + BINARY + CALENDAR + ARRAY + STRUCT + UDT).nested()

/**
* Different types of Pandas UDF support different sets of output type. Please refer to
* https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L98
* for more details.
*
* It is impossible to specify the exact type signature for each Pandas UDF type in a single
* expression 'PythonUDF'.
*
* So here comes the union of all the sets of supported type, to cover all the cases.
*/
val unionOfPandasUdfOut = (commonCudfTypes + BINARY + DECIMAL + NULL + ARRAY + MAP).nested() +
STRUCT

def getDataType(expr: Expression): Option[DataType] = {
try {
Some(expr.dataType)
Expand Down
Loading

0 comments on commit 0f9b30e

Please sign in to comment.