From e20fbe401e2fec1883d326e40cca34d229569afc Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 9 Oct 2020 13:47:38 -0500 Subject: [PATCH 1/3] Move pandas_udf functions into the tests functions so they don't try to compile when skipped --- .../src/main/python/spark_init_internal.py | 1 - .../src/main/python/udf_cudf_test.py | 120 ++++++++++-------- 2 files changed, 66 insertions(+), 55 deletions(-) diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index 65e1e0335eb..9722b289294 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -23,7 +23,6 @@ def _spark__init(): _s = SparkSession.builder \ .config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\ - .enableHiveSupport() \ .appName('rapids spark plugin integration tests (python)').getOrCreate() #TODO catch the ClassNotFound error that happens if the classpath is not set up properly and # make it a better error message diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index f13eb189b64..52762161986 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -72,22 +72,19 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False) # ======= Test Scalar ======= -@pandas_udf('int') -def _plus_one_cpu_func(v: pd.Series) -> pd.Series: - return v + 1 - - -@pandas_udf('int') -def _plus_one_gpu_func(v: pd.Series) -> pd.Series: - import cudf - gpu_series = cudf.Series(v) - gpu_series = gpu_series + 1 - return gpu_series.to_pandas() - - @cudf_udf @pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data']) def test_with_column(enable_cudf_udf, data): + @pandas_udf('int') + def _plus_one_cpu_func(v: pd.Series) -> pd.Series: + return v + 1 + + @pandas_udf('int') + def _plus_one_gpu_func(v: pd.Series) -> pd.Series: + import cudf + gpu_series = cudf.Series(v) + gpu_series = gpu_series + 1 + return gpu_series.to_pandas() def cpu_run(spark): df = _create_df(spark, data) return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect() @@ -101,6 +98,17 @@ def gpu_run(spark): @cudf_udf def test_sql(enable_cudf_udf): + @pandas_udf('int') + def _plus_one_cpu_func(v: pd.Series) -> pd.Series: + return v + 1 + + @pandas_udf('int') + def _plus_one_gpu_func(v: pd.Series) -> pd.Series: + import cudf + gpu_series = cudf.Series(v) + gpu_series = gpu_series + 1 + return gpu_series.to_pandas() + def cpu_run(spark): _ = spark.udf.register("add_one_cpu", _plus_one_cpu_func) _create_df(spark).createOrReplaceTempView("test_table_cpu") @@ -115,23 +123,21 @@ def gpu_run(spark): # ======= Test Scalar Iterator ======= -@pandas_udf("long") -def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: - for s in iterator: - yield s + 1 - - -@pandas_udf("long") -def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: - import cudf - for s in iterator: - gpu_serises = cudf.Series(s) - gpu_serises = gpu_serises + 1 - yield gpu_serises.to_pandas() - - @cudf_udf def test_select(enable_cudf_udf): + @pandas_udf("long") + def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + for s in iterator: + yield s + 1 + + @pandas_udf("long") + def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + import cudf + for s in iterator: + gpu_serises = cudf.Series(s) + gpu_serises = gpu_serises + 1 + yield gpu_serises.to_pandas() + def cpu_run(spark): df = _create_df(spark) return df.select(_plus_one_cpu_iter_func(df.v)).collect() @@ -169,23 +175,21 @@ def _filter_gpu_func(iterator): # ======= Test Grouped Map In Pandas ======= # To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP # need to add udf type -@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) -def _normalize_cpu_func(df): - v = df.v - return df.assign(v=(v - v.mean()) / v.std()) - - -@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) -def _normalize_gpu_func(df): - import cudf - gdf = cudf.from_pandas(df) - v = gdf.v - return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() - - @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply(enable_cudf_udf): + @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) + def _normalize_cpu_func(df): + v = df.v + return df.assign(v=(v - v.mean()) / v.std()) + + @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) + def _normalize_gpu_func(df): + import cudf + gdf = cudf.from_pandas(df) + v = gdf.v + return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() + def cpu_run(spark): df = _create_df(spark) return df.groupby("id").apply(_normalize_cpu_func).collect() @@ -220,21 +224,19 @@ def _normalize_gpu_in_pandas_func(df): # ======= Test Aggregate In Pandas ======= -@pandas_udf("int") -def _sum_cpu_func(v: pd.Series) -> int: - return v.sum() - - -@pandas_udf("integer") -def _sum_gpu_func(v: pd.Series) -> int: - import cudf - gpu_series = cudf.Series(v) - return gpu_series.sum() - - @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_group_agg(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): df = _create_df(spark) return df.groupby("id").agg(_sum_cpu_func(df.v)).collect() @@ -249,6 +251,16 @@ def gpu_run(spark): @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_sql_group(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): _ = spark.udf.register("sum_cpu_udf", _sum_cpu_func) q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2" From 10bd6d6031965e29b13b79bafd2e87ca91d51746 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 9 Oct 2020 13:51:03 -0500 Subject: [PATCH 2/3] put back enable hive Signed-off-by: Thomas Graves --- integration_tests/src/main/python/spark_init_internal.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index 9722b289294..65e1e0335eb 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -23,6 +23,7 @@ def _spark__init(): _s = SparkSession.builder \ .config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\ + .enableHiveSupport() \ .appName('rapids spark plugin integration tests (python)').getOrCreate() #TODO catch the ClassNotFound error that happens if the classpath is not set up properly and # make it a better error message From 5d4e2391f6374a55933a2a1ca2f7fb7c760fdecb Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 12 Oct 2020 08:28:02 -0500 Subject: [PATCH 3/3] Add missing functions to test_window Signed-off-by: Thomas Graves --- integration_tests/src/main/python/udf_cudf_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index 52762161986..650561960e4 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -279,6 +279,16 @@ def gpu_run(spark): 'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') @cudf_udf def test_window(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): df = _create_df(spark) w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)