diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index f13eb189b64..650561960e4 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -72,22 +72,19 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False) # ======= Test Scalar ======= -@pandas_udf('int') -def _plus_one_cpu_func(v: pd.Series) -> pd.Series: - return v + 1 - - -@pandas_udf('int') -def _plus_one_gpu_func(v: pd.Series) -> pd.Series: - import cudf - gpu_series = cudf.Series(v) - gpu_series = gpu_series + 1 - return gpu_series.to_pandas() - - @cudf_udf @pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data']) def test_with_column(enable_cudf_udf, data): + @pandas_udf('int') + def _plus_one_cpu_func(v: pd.Series) -> pd.Series: + return v + 1 + + @pandas_udf('int') + def _plus_one_gpu_func(v: pd.Series) -> pd.Series: + import cudf + gpu_series = cudf.Series(v) + gpu_series = gpu_series + 1 + return gpu_series.to_pandas() def cpu_run(spark): df = _create_df(spark, data) return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect() @@ -101,6 +98,17 @@ def gpu_run(spark): @cudf_udf def test_sql(enable_cudf_udf): + @pandas_udf('int') + def _plus_one_cpu_func(v: pd.Series) -> pd.Series: + return v + 1 + + @pandas_udf('int') + def _plus_one_gpu_func(v: pd.Series) -> pd.Series: + import cudf + gpu_series = cudf.Series(v) + gpu_series = gpu_series + 1 + return gpu_series.to_pandas() + def cpu_run(spark): _ = spark.udf.register("add_one_cpu", _plus_one_cpu_func) _create_df(spark).createOrReplaceTempView("test_table_cpu") @@ -115,23 +123,21 @@ def gpu_run(spark): # ======= Test Scalar Iterator ======= -@pandas_udf("long") -def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: - for s in iterator: - yield s + 1 - - -@pandas_udf("long") -def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: - import cudf - for s in iterator: - gpu_serises = cudf.Series(s) - gpu_serises = gpu_serises + 1 - yield gpu_serises.to_pandas() - - @cudf_udf def test_select(enable_cudf_udf): + @pandas_udf("long") + def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + for s in iterator: + yield s + 1 + + @pandas_udf("long") + def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + import cudf + for s in iterator: + gpu_serises = cudf.Series(s) + gpu_serises = gpu_serises + 1 + yield gpu_serises.to_pandas() + def cpu_run(spark): df = _create_df(spark) return df.select(_plus_one_cpu_iter_func(df.v)).collect() @@ -169,23 +175,21 @@ def _filter_gpu_func(iterator): # ======= Test Grouped Map In Pandas ======= # To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP # need to add udf type -@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) -def _normalize_cpu_func(df): - v = df.v - return df.assign(v=(v - v.mean()) / v.std()) - - -@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) -def _normalize_gpu_func(df): - import cudf - gdf = cudf.from_pandas(df) - v = gdf.v - return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() - - @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply(enable_cudf_udf): + @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) + def _normalize_cpu_func(df): + v = df.v + return df.assign(v=(v - v.mean()) / v.std()) + + @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) + def _normalize_gpu_func(df): + import cudf + gdf = cudf.from_pandas(df) + v = gdf.v + return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() + def cpu_run(spark): df = _create_df(spark) return df.groupby("id").apply(_normalize_cpu_func).collect() @@ -220,21 +224,19 @@ def _normalize_gpu_in_pandas_func(df): # ======= Test Aggregate In Pandas ======= -@pandas_udf("int") -def _sum_cpu_func(v: pd.Series) -> int: - return v.sum() - - -@pandas_udf("integer") -def _sum_gpu_func(v: pd.Series) -> int: - import cudf - gpu_series = cudf.Series(v) - return gpu_series.sum() - - @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_group_agg(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): df = _create_df(spark) return df.groupby("id").agg(_sum_cpu_func(df.v)).collect() @@ -249,6 +251,16 @@ def gpu_run(spark): @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_sql_group(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): _ = spark.udf.register("sum_cpu_udf", _sum_cpu_func) q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2" @@ -267,6 +279,16 @@ def gpu_run(spark): 'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') @cudf_udf def test_window(enable_cudf_udf): + @pandas_udf("int") + def _sum_cpu_func(v: pd.Series) -> int: + return v.sum() + + @pandas_udf("integer") + def _sum_gpu_func(v: pd.Series) -> int: + import cudf + gpu_series = cudf.Series(v) + return gpu_series.sum() + def cpu_run(spark): df = _create_df(spark) w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)