Skip to content

Commit

Permalink
Enable tests in udf_cudf_test.py (NVIDIA#777)
Browse files Browse the repository at this point in the history
* Enable tests in udf_cudf_test.py

1) Update the configs for CUDF tests to enable pool and turn down the
   pool size to avoid OOM as much as possible.
2) Increase the size of the test data frame to avoid IPC errors. Some tasks
   will get empty data when the data frame is small enough, then the IPC
   error happens.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Skip cudf test for premerge

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Skip cudf tests for Spark 3.1.0+ temporarily

Since shim layer for Spark 3.1.0 is not ready.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Use LooseVersion to compare the version instead
  • Loading branch information
firestarman authored Sep 18, 2020
1 parent d145efe commit 051bbaa
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 54 deletions.
104 changes: 59 additions & 45 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import pandas as pd
import pytest
import time
from distutils.version import LooseVersion
from typing import Iterator
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_init_internal import spark_version
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf

pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'),
reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet")


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
Expand All @@ -31,11 +36,11 @@
'spark.rapids.sql.python.gpu.enabled': 'true'
}


def _create_df(spark):
return spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)
elements = list(map(lambda i: (i, i/1.0), range(1, 5000)))
return spark.createDataFrame(elements * 2, ("id", "v"))


# since this test requires to run different functions on CPU and GPU(need cudf),
# create its own assert function
Expand All @@ -54,21 +59,22 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False)
assert cpu_ret.sort() == gpu_ret.sort()
else:
assert cpu_ret == gpu_ret



# ======= Test Scalar =======
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1


@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_serises = cudf.Series(v)
gpu_serises = gpu_serises + 1
return gpu_serises.to_pandas()
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()


@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")
@cudf_udf
def test_with_column():
def cpu_run(spark):
Expand All @@ -78,38 +84,41 @@ def cpu_run(spark):
def gpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)

@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")

@cudf_udf
def test_sql():
def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect()
_create_df(spark).createOrReplaceTempView("test_table_cpu")
return spark.sql("SELECT add_one_cpu(id) FROM test_table_cpu").collect()

def gpu_run(spark):
_ = spark.udf.register("add_one_gpu", _plus_one_gpu_func)
return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect()

_create_df(spark).createOrReplaceTempView("test_table_gpu")
return spark.sql("SELECT add_one_gpu(id) FROM test_table_gpu").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Scalar Iterator =======
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1


@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()

@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")


@cudf_udf
def test_select():
def cpu_run(spark):
Expand All @@ -123,44 +132,46 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
# ======= Test Flat Map In Pandas =======
@allow_non_gpu('GpuMapInPandasExec','PythonUDF')
@cudf_udf
def test_map_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _filter_cpu_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df = _create_df(spark)
return df.mapInPandas(_filter_cpu_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _filter_gpu_func(iterator):
import cudf
for pdf in iterator:
gdf = cudf.from_pandas(pdf)
yield gdf[gdf.id == 1].to_pandas()
df = _create_df(spark)
return df.mapInPandas(_filter_gpu_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Grouped Map In Pandas =======
# To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
# need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()

@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")

@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply():
Expand All @@ -171,44 +182,45 @@ def cpu_run(spark):
def gpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_gpu_func).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _normalize_cpu_in_pandas_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _normalize_gpu_in_pandas_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


# ======= Test Aggregate In Pandas =======
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()


@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_serises = cudf.Series(v)
return gpu_serises.sum()
gpu_series = cudf.Series(v)
return gpu_series.sum()


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_group_agg():
Expand All @@ -223,7 +235,6 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_sql_group():
Expand All @@ -240,8 +251,9 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition','SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
# ======= Test Window In Pandas =======
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition',
'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
@cudf_udf
def test_window():
def cpu_run(spark):
Expand All @@ -254,37 +266,39 @@ def gpu_run(spark):
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
# ======= Test CoGroup Map In Pandas =======
@allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_cogroup():
def cpu_run(spark):
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

def gpu_run(spark):
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)

Expand Down
8 changes: 5 additions & 3 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"
tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
rm -f $SPARK_HOME.tgz

mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS=''
ARGS_SKIP_TESTS='not cudf_udf'

mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"
# Run the unit tests for other Spark versions but dont run full python integration tests
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"

# The jacoco coverage should have been collected, but because of how the shade plugin
# works and jacoco we need to clean some things up so jacoco will only report for the
Expand Down
14 changes: 8 additions & 6 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ MORTGAGE_SPARK_SUBMIT_ARGS=" --conf spark.plugins=com.nvidia.spark.SQLPlugin \
--class com.nvidia.spark.rapids.tests.mortgage.Main \
$RAPIDS_TEST_JAR"

# need to disable pooling for udf test to prevent cudaErrorMemoryAllocation
CUDF_UDF_TEST_ARGS="--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.memory.gpu.pooling.enabled=false \
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar \
RAPIDS_FILE_NAME=${RAPIDS_PLUGIN_JAR##*/}
CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.concurrentPythonWorkers=2 \
--conf spark.executorEnv.PYTHONPATH=${RAPIDS_FILE_NAME} \
--py-files ${RAPIDS_PLUGIN_JAR}"

TEST_PARAMS="$SPARK_VER $PARQUET_PERF $PARQUET_ACQ $OUTPUT"
Expand All @@ -90,5 +91,6 @@ jps
echo "----------------------------START TEST------------------------------------"
rm -rf $OUTPUT
spark-submit $BASE_SPARK_SUBMIT_ARGS $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS
cd $RAPIDS_INT_TESTS_HOME && spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/"
spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "cudf_udf" -v -rfExXs
cd $RAPIDS_INT_TESTS_HOME
spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/"
spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS ./runtests.py -m "cudf_udf" -v -rfExXs

0 comments on commit 051bbaa

Please sign in to comment.