Skip to content

Commit

Permalink
Move pandas_udf functions into the tests functions so they don't try to
Browse files Browse the repository at this point in the history
compile when skipped
  • Loading branch information
tgravescs committed Oct 9, 2020
1 parent f64068a commit e20fbe4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 55 deletions.
1 change: 0 additions & 1 deletion integration_tests/src/main/python/spark_init_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def _spark__init():
_s = SparkSession.builder \
.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
.config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\
.enableHiveSupport() \
.appName('rapids spark plugin integration tests (python)').getOrCreate()
#TODO catch the ClassNotFound error that happens if the classpath is not set up properly and
# make it a better error message
Expand Down
120 changes: 66 additions & 54 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,19 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False)


# ======= Test Scalar =======
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1


@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()


@cudf_udf
@pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data'])
def test_with_column(enable_cudf_udf, data):
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1

@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()
def cpu_run(spark):
df = _create_df(spark, data)
return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect()
Expand All @@ -101,6 +98,17 @@ def gpu_run(spark):

@cudf_udf
def test_sql(enable_cudf_udf):
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1

@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()

def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
_create_df(spark).createOrReplaceTempView("test_table_cpu")
Expand All @@ -115,23 +123,21 @@ def gpu_run(spark):


# ======= Test Scalar Iterator =======
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1


@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()


@cudf_udf
def test_select(enable_cudf_udf):
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1

@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()

def cpu_run(spark):
df = _create_df(spark)
return df.select(_plus_one_cpu_iter_func(df.v)).collect()
Expand Down Expand Up @@ -169,23 +175,21 @@ def _filter_gpu_func(iterator):
# ======= Test Grouped Map In Pandas =======
# To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
# need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()


@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply(enable_cudf_udf):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()

def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_cpu_func).collect()
Expand Down Expand Up @@ -220,21 +224,19 @@ def _normalize_gpu_in_pandas_func(df):


# ======= Test Aggregate In Pandas =======
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()


@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()


@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_group_agg(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").agg(_sum_cpu_func(df.v)).collect()
Expand All @@ -249,6 +251,16 @@ def gpu_run(spark):
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_sql_group(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

def cpu_run(spark):
_ = spark.udf.register("sum_cpu_udf", _sum_cpu_func)
q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
Expand Down

0 comments on commit e20fbe4

Please sign in to comment.