From 051bbaac22d26afad58d2786531e0a43b492d001 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Fri, 18 Sep 2020 09:15:49 +0800 Subject: [PATCH] Enable tests in udf_cudf_test.py (#777) * Enable tests in udf_cudf_test.py 1) Update the configs for CUDF tests to enable pool and turn down the pool size to avoid OOM as much as possible. 2) Increase the size of the test data frame to avoid IPC errors. Some tasks will get empty data when the data frame is small enough, then the IPC error happens. Signed-off-by: Firestarman * Skip cudf test for premerge Signed-off-by: Firestarman * Skip cudf tests for Spark 3.1.0+ temporarily Since shim layer for Spark 3.1.0 is not ready. Signed-off-by: Firestarman * Use LooseVersion to compare the version instead --- .../src/main/python/udf_cudf_test.py | 104 ++++++++++-------- jenkins/spark-premerge-build.sh | 8 +- jenkins/spark-tests.sh | 14 ++- 3 files changed, 72 insertions(+), 54 deletions(-) diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index 7d710adb062..de8a9d0a34e 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -12,15 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest import pandas as pd +import pytest import time +from distutils.version import LooseVersion from typing import Iterator from pyspark.sql import Window from pyspark.sql.functions import pandas_udf, PandasUDFType +from spark_init_internal import spark_version from spark_session import with_cpu_session, with_gpu_session from marks import allow_non_gpu, cudf_udf +pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'), + reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet") + _conf = { 'spark.rapids.sql.exec.MapInPandasExec':'true', @@ -31,11 +36,11 @@ 'spark.rapids.sql.python.gpu.enabled': 'true' } + def _create_df(spark): - return spark.createDataFrame( - [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], - ("id", "v") - ) + elements = list(map(lambda i: (i, i/1.0), range(1, 5000))) + return spark.createDataFrame(elements * 2, ("id", "v")) + # since this test requires to run different functions on CPU and GPU(need cudf), # create its own assert function @@ -54,21 +59,22 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False) assert cpu_ret.sort() == gpu_ret.sort() else: assert cpu_ret == gpu_ret - + +# ======= Test Scalar ======= @pandas_udf('int') def _plus_one_cpu_func(v: pd.Series) -> pd.Series: return v + 1 + @pandas_udf('int') def _plus_one_gpu_func(v: pd.Series) -> pd.Series: import cudf - gpu_serises = cudf.Series(v) - gpu_serises = gpu_serises + 1 - return gpu_serises.to_pandas() + gpu_series = cudf.Series(v) + gpu_series = gpu_series + 1 + return gpu_series.to_pandas() + -@allow_non_gpu(any=True) -@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") @cudf_udf def test_with_column(): def cpu_run(spark): @@ -78,28 +84,32 @@ def cpu_run(spark): def gpu_run(spark): df = _create_df(spark) return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect() - + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) -@allow_non_gpu(any=True) -@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") + @cudf_udf def test_sql(): def cpu_run(spark): _ = spark.udf.register("add_one_cpu", _plus_one_cpu_func) - return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect() + _create_df(spark).createOrReplaceTempView("test_table_cpu") + return spark.sql("SELECT add_one_cpu(id) FROM test_table_cpu").collect() + def gpu_run(spark): _ = spark.udf.register("add_one_gpu", _plus_one_gpu_func) - return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect() - + _create_df(spark).createOrReplaceTempView("test_table_gpu") + return spark.sql("SELECT add_one_gpu(id) FROM test_table_gpu").collect() + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) +# ======= Test Scalar Iterator ======= @pandas_udf("long") def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: for s in iterator: yield s + 1 + @pandas_udf("long") def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: import cudf @@ -107,9 +117,8 @@ def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series gpu_serises = cudf.Series(s) gpu_serises = gpu_serises + 1 yield gpu_serises.to_pandas() - -@allow_non_gpu(any=True) -@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") + + @cudf_udf def test_select(): def cpu_run(spark): @@ -123,29 +132,30 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") +# ======= Test Flat Map In Pandas ======= @allow_non_gpu('GpuMapInPandasExec','PythonUDF') @cudf_udf def test_map_in_pandas(): def cpu_run(spark): - df = _create_df(spark) def _filter_cpu_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] + df = _create_df(spark) return df.mapInPandas(_filter_cpu_func, df.schema).collect() def gpu_run(spark): - df = _create_df(spark) def _filter_gpu_func(iterator): import cudf for pdf in iterator: gdf = cudf.from_pandas(pdf) yield gdf[gdf.id == 1].to_pandas() + df = _create_df(spark) return df.mapInPandas(_filter_gpu_func, df.schema).collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) +# ======= Test Grouped Map In Pandas ======= # To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP # need to add udf type @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) @@ -153,6 +163,7 @@ def _normalize_cpu_func(df): v = df.v return df.assign(v=(v - v.mean()) / v.std()) + @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def _normalize_gpu_func(df): import cudf @@ -160,7 +171,7 @@ def _normalize_gpu_func(df): v = gdf.v return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") + @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply(): @@ -171,44 +182,45 @@ def cpu_run(spark): def gpu_run(spark): df = _create_df(spark) return df.groupby("id").apply(_normalize_gpu_func).collect() - + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply_in_pandas(): def cpu_run(spark): - df = _create_df(spark) def _normalize_cpu_in_pandas_func(df): v = df.v return df.assign(v=(v - v.mean()) / v.std()) + df = _create_df(spark) return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect() def gpu_run(spark): - df = _create_df(spark) def _normalize_gpu_in_pandas_func(df): import cudf gdf = cudf.from_pandas(df) v = gdf.v return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() + df = _create_df(spark) return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) +# ======= Test Aggregate In Pandas ======= @pandas_udf("int") def _sum_cpu_func(v: pd.Series) -> int: return v.sum() + @pandas_udf("integer") def _sum_gpu_func(v: pd.Series) -> int: import cudf - gpu_serises = cudf.Series(v) - return gpu_serises.sum() + gpu_series = cudf.Series(v) + return gpu_series.sum() + -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_group_agg(): @@ -223,7 +235,6 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_sql_group(): @@ -240,8 +251,9 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") -@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition','SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') +# ======= Test Window In Pandas ======= +@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition', + 'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') @cudf_udf def test_window(): def cpu_run(spark): @@ -254,37 +266,39 @@ def gpu_run(spark): w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect() - _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") +# ======= Test CoGroup Map In Pandas ======= @allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF') @cudf_udf def test_cogroup(): def cpu_run(spark): + def _cpu_join_func(l, r): + return pd.merge(l, r, on="time") df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) - def _cpu_join_func(l, r): - return pd.merge(l, r, on="time") - return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect() + return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, + schema="time int, id_x int, id_y int, v1 double, v2 string").collect() def gpu_run(spark): + def _gpu_join_func(l, r): + import cudf + gl = cudf.from_pandas(l) + gr = cudf.from_pandas(r) + return gl.merge(gr, on="time").to_pandas() df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) - def _gpu_join_func(l, r): - import cudf - gl = cudf.from_pandas(l) - gr = cudf.from_pandas(r) - return gl.merge(gr, on="time").to_pandas() - return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect() + return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, + schema="time int, id_x int, id_y int, v1 double, v2 string").collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index e2129d3002f..847e0ac43f7 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -37,10 +37,12 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH" tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \ rm -f $SPARK_HOME.tgz -mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS='' +ARGS_SKIP_TESTS='not cudf_udf' + +mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" # Run the unit tests for other Spark versions but dont run full python integration tests -env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf' -env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf' +env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" +env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" # The jacoco coverage should have been collected, but because of how the shade plugin # works and jacoco we need to clean some things up so jacoco will only report for the diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 677e5919c8b..79ee252b80a 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -70,10 +70,11 @@ MORTGAGE_SPARK_SUBMIT_ARGS=" --conf spark.plugins=com.nvidia.spark.SQLPlugin \ --class com.nvidia.spark.rapids.tests.mortgage.Main \ $RAPIDS_TEST_JAR" -# need to disable pooling for udf test to prevent cudaErrorMemoryAllocation -CUDF_UDF_TEST_ARGS="--conf spark.rapids.python.memory.gpu.pooling.enabled=false \ - --conf spark.rapids.memory.gpu.pooling.enabled=false \ - --conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar \ +RAPIDS_FILE_NAME=${RAPIDS_PLUGIN_JAR##*/} +CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \ + --conf spark.rapids.python.memory.gpu.allocFraction=0.1 \ + --conf spark.rapids.python.concurrentPythonWorkers=2 \ + --conf spark.executorEnv.PYTHONPATH=${RAPIDS_FILE_NAME} \ --py-files ${RAPIDS_PLUGIN_JAR}" TEST_PARAMS="$SPARK_VER $PARQUET_PERF $PARQUET_ACQ $OUTPUT" @@ -90,5 +91,6 @@ jps echo "----------------------------START TEST------------------------------------" rm -rf $OUTPUT spark-submit $BASE_SPARK_SUBMIT_ARGS $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS -cd $RAPIDS_INT_TESTS_HOME && spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/" -spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "cudf_udf" -v -rfExXs +cd $RAPIDS_INT_TESTS_HOME +spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/" +spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS ./runtests.py -m "cudf_udf" -v -rfExXs