Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate data transfer for map Pandas UDF plan #2035

Merged
merged 5 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10616,11 +10616,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">reduction</td>
Expand Down Expand Up @@ -10659,11 +10659,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">window</td>
Expand Down Expand Up @@ -10702,11 +10702,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">project</td>
Expand Down Expand Up @@ -10745,11 +10745,11 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, MAP)</em></td>
<td> </td>
</tr>
<tr>
<td rowSpan="4">Quarter</td>
Expand Down
35 changes: 28 additions & 7 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,38 @@ def pandas_filter(iterator):

@pytest.mark.parametrize('data_gen', data_gens_nested_for_udf, ids=idfn)
def test_pandas_map_udf_nested_type(data_gen):
# Spark supports limited types as the return type of pandas UDF.
# For details please go to
# https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/types.py#L28
# So we return the data frames with one column of integral type for all types of the input.
def size_udf(pdf_itr):
# Supported UDF output types by plugin: (commonCudfTypes + ARRAY).nested() + STRUCT
# STRUCT represents the whole dataframe in Map Pandas UDF, so no struct column in UDF output.
# More details is here
# https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L119
jlowe marked this conversation as resolved.
Show resolved Hide resolved
udf_out_schema = 'c_integral long,' \
'c_string string,' \
'c_fp double,' \
'c_bool boolean,' \
'c_date date,' \
'c_time timestamp,' \
'c_array_array array<array<long>>,' \
'c_array_string array<string>'

def col_types_udf(pdf_itr):
for pdf in pdf_itr:
yield pd.DataFrame({"ret": [i for i in range(len(pdf))]})
# Return a data frame with columns of supported type, and there is only one row.
# The values can not be generated randomly because it should return the same data
# for both CPU and GPU runs.
yield pd.DataFrame({
"c_integral": [len(pdf)],
"c_string": ["size" + str(len(pdf))],
"c_fp": [float(len(pdf))],
"c_bool": [False],
"c_date": [date(2021, 4, 2)],
"c_time": [datetime(2021, 4, 2, tzinfo=timezone.utc)],
"c_array_array": [[[len(pdf)]]],
"c_array_string": [["size" + str(len(pdf))]]
})

assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen)\
.mapInPandas(size_udf, schema="ret long"),
.mapInPandas(col_types_udf, schema=udf_out_schema),
conf=arrow_udf_conf)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,8 +2008,15 @@ object GpuOverrides {
"UDF run in an external python process. Does not actually run on the GPU, but " +
"the transfer of data to/from it can be accelerated.",
ExprChecks.fullAggAndProject(
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all,
// Different types of Pandas UDF support different sets of output type. Please refer to
// https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L98
// for more details.
// It is impossible to specify the exact type signature for each Pandas UDF type in a single
// expression 'PythonUDF'.
// So use the 'unionOfPandasUdfOut' to cover all types for Spark. The type signature of
// plugin is also an union of all the types of Pandas UDF.
(TypeSig.commonCudfTypes + TypeSig.ARRAY).nested() + TypeSig.STRUCT,
TypeSig.unionOfPandasUdfOut,
repeatingParamCheck = Some(RepeatingParamCheck(
"param",
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
Expand Down
13 changes: 13 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ object TypeSig {
val orderable: TypeSig = (BOOLEAN + BYTE + SHORT + INT + LONG + FLOAT + DOUBLE + DATE +
TIMESTAMP + STRING + DECIMAL + NULL + BINARY + CALENDAR + ARRAY + STRUCT + UDT).nested()

/**
* Different types of Pandas UDF support different sets of output type. Please refer to
* https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L98
* for more details.
*
* It is impossible to specify the exact type signature for each Pandas UDF type in a single
* expression 'PythonUDF'.
*
* So here comes the union of all the sets of supported type, to cover all the cases.
*/
val unionOfPandasUdfOut = (commonCudfTypes + BINARY + DECIMAL + NULL + ARRAY + MAP).nested() +
STRUCT

def getDataType(expr: Expression): Option[DataType] = {
try {
Some(expr.dataType)
Expand Down