Skip to content

Commit

Permalink
Accelerate the data transfer between JVM and Python for the plan 'Gpu…
Browse files Browse the repository at this point in the history
…WindowInPandasExec' (NVIDIA#1069)

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Nov 12, 2020
1 parent 7b8181d commit 98fb783
Show file tree
Hide file tree
Showing 10 changed files with 705 additions and 902 deletions.
12 changes: 6 additions & 6 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ Name | Description | Default Value | Notes
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|false|This is disabled by default because large joins can cause out of memory errors|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and Python process. It also supports running the Python UDFs code on the GPU when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Pandas UDF with window functions, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.|false|This is disabled by default because it only supports row based frame for now|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

### Scans
Expand Down
76 changes: 66 additions & 10 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import pandas as pd
from typing import Iterator, Tuple

arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true'}
arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true'
}

####################################################################
# NOTE: pytest does not play well with pyspark udfs, because pyspark
Expand All @@ -44,6 +47,7 @@
# itself.
####################################################################


@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn)
def test_pandas_math_udf(data_gen):
def add(a, b):
Expand All @@ -54,6 +58,7 @@ def add(a, b):
my_udf(f.col('a') - 3, f.col('b'))),
conf=arrow_udf_conf)


@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn)
def test_iterator_math_udf(data_gen):
def iterator_add(to_process: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
Expand All @@ -66,6 +71,7 @@ def iterator_add(to_process: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[
my_udf(f.col('a'), f.col('b'))),
conf=arrow_udf_conf)


@approximate_float
@allow_non_gpu('AggregateInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn)
Expand All @@ -79,6 +85,7 @@ def pandas_sum(to_process: pd.Series) -> float:
pandas_sum(f.col('a'))),
conf=arrow_udf_conf)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/757")
@ignore_order
@allow_non_gpu('AggregateInPandasExec', 'PythonUDF', 'Alias')
Expand All @@ -94,23 +101,70 @@ def pandas_sum(to_process: pd.Series) -> int:
.agg(pandas_sum(f.col('b'))),
conf=arrow_udf_conf)


# ======= Test window in Pandas =======
# range frame is not supported yet.
no_part_win = Window\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

unbounded_win = Window\
.partitionBy('a')\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

cur_follow_win = Window\
.partitionBy('a')\
.orderBy('b')\
.rowsBetween(Window.currentRow, Window.unboundedFollowing)

pre_cur_win = Window\
.partitionBy('a')\
.orderBy('b')\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

low_upper_win = Window.partitionBy('a').orderBy('b').rowsBetween(-3, 3)

udf_windows = [no_part_win, unbounded_win, cur_follow_win, pre_cur_win, low_upper_win]
window_ids = ['No_Partition', 'Unbounded', 'Unbounded_Following', 'Unbounded_Preceding',
'Lower_Upper']


# It fails periodically only when using LongGen, so split it into
# "test_window_aggregate_udf_long"
# and
# "test_window_aggregate_udf"
# to unblock the basic window functionality test.
@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/740")
@ignore_order
@allow_non_gpu('WindowInPandasExec', 'PythonUDF', 'WindowExpression', 'Alias', 'WindowSpecDefinition', 'SpecifiedWindowFrame', 'UnboundedPreceding$', 'UnboundedFollowing$')
@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn)
def test_window_aggregate_udf(data_gen):
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('window', udf_windows, ids=window_ids)
def test_window_aggregate_udf_long(data_gen, window):

@f.pandas_udf('long')
def pandas_sum(to_process: pd.Series) -> int:
return to_process.sum()

w = Window\
.partitionBy('a') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
pandas_sum(f.col('b')).over(w)),
conf=arrow_udf_conf)
lambda spark: binary_op_df(spark, data_gen).select(
pandas_sum(f.col('b')).over(window)),
conf=arrow_udf_conf)


@ignore_order
@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen], ids=idfn)
@pytest.mark.parametrize('window', udf_windows, ids=window_ids)
def test_window_aggregate_udf(data_gen, window):

@f.pandas_udf('long')
def pandas_sum(to_process: pd.Series) -> int:
return to_process.sum()

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).select(
pandas_sum(f.col('b')).over(window)),
conf=arrow_udf_conf)


# ======= Test flat map group in Pandas =======
@ignore_order
@allow_non_gpu('FlatMapGroupsInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', [LongGen()], ids=idfn)
Expand Down Expand Up @@ -138,11 +192,13 @@ def pandas_filter(iterator):
.mapInPandas(pandas_filter, schema="a long, b long"),
conf=arrow_udf_conf)


def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length)
return left, right


@ignore_order
@allow_non_gpu('FlatMapCoGroupsInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.shims.spark300._
Expand Down Expand Up @@ -175,12 +174,7 @@ class Spark300Shims extends SparkShims {
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions, it runs on CPU itself now but" +
" supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

Expand Down
Loading

0 comments on commit 98fb783

Please sign in to comment.