Skip to content

Commit

Permalink
Move pandas_udf functions into the tests functions (NVIDIA#926)
Browse files Browse the repository at this point in the history
* Move pandas_udf functions into the tests functions so they don't try to
compile when skipped

* put back enable hive

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Add missing functions to test_window

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
tgravescs authored Oct 13, 2020
1 parent 1e8c34a commit 32a9d4b
Showing 1 changed file with 76 additions and 54 deletions.
130 changes: 76 additions & 54 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,19 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False)


# ======= Test Scalar =======
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1


@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()


@cudf_udf
@pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data'])
def test_with_column(enable_cudf_udf, data):
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1

@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()
def cpu_run(spark):
df = _create_df(spark, data)
return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect()
Expand All @@ -101,6 +98,17 @@ def gpu_run(spark):

@cudf_udf
def test_sql(enable_cudf_udf):
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1

@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()

def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
_create_df(spark).createOrReplaceTempView("test_table_cpu")
Expand All @@ -115,23 +123,21 @@ def gpu_run(spark):


# ======= Test Scalar Iterator =======
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1


@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()


@cudf_udf
def test_select(enable_cudf_udf):
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1

@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()

def cpu_run(spark):
df = _create_df(spark)
return df.select(_plus_one_cpu_iter_func(df.v)).collect()
Expand Down Expand Up @@ -169,23 +175,21 @@ def _filter_gpu_func(iterator):
# ======= Test Grouped Map In Pandas =======
# To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
# need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()


@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply(enable_cudf_udf):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()

def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_cpu_func).collect()
Expand Down Expand Up @@ -220,21 +224,19 @@ def _normalize_gpu_in_pandas_func(df):


# ======= Test Aggregate In Pandas =======
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()


@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()


@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_group_agg(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").agg(_sum_cpu_func(df.v)).collect()
Expand All @@ -249,6 +251,16 @@ def gpu_run(spark):
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_sql_group(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

def cpu_run(spark):
_ = spark.udf.register("sum_cpu_udf", _sum_cpu_func)
q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
Expand All @@ -267,6 +279,16 @@ def gpu_run(spark):
'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
@cudf_udf
def test_window(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

def cpu_run(spark):
df = _create_df(spark)
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Expand Down

0 comments on commit 32a9d4b

Please sign in to comment.