From 007f8843468ca1cb29c51145e7699d90552d4909 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 21 Sep 2020 08:44:44 -0500 Subject: [PATCH] Revert "Enable tests in udf_cudf_test.py (#777)" (#813) This reverts commit f76ed9c31295fcfae15d6cb4ab47122462a5b321. Signed-off-by: Thomas Graves --- .../src/main/python/udf_cudf_test.py | 104 ++++++++---------- jenkins/spark-premerge-build.sh | 8 +- jenkins/spark-tests.sh | 14 +-- 3 files changed, 54 insertions(+), 72 deletions(-) diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index de8a9d0a34e..7d710adb062 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -12,20 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pandas as pd import pytest +import pandas as pd import time -from distutils.version import LooseVersion from typing import Iterator from pyspark.sql import Window from pyspark.sql.functions import pandas_udf, PandasUDFType -from spark_init_internal import spark_version from spark_session import with_cpu_session, with_gpu_session from marks import allow_non_gpu, cudf_udf -pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'), - reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet") - _conf = { 'spark.rapids.sql.exec.MapInPandasExec':'true', @@ -36,11 +31,11 @@ 'spark.rapids.sql.python.gpu.enabled': 'true' } - def _create_df(spark): - elements = list(map(lambda i: (i, i/1.0), range(1, 5000))) - return spark.createDataFrame(elements * 2, ("id", "v")) - + return spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ("id", "v") + ) # since this test requires to run different functions on CPU and GPU(need cudf), # create its own assert function @@ -59,22 +54,21 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False) assert cpu_ret.sort() == gpu_ret.sort() else: assert cpu_ret == gpu_ret + - -# ======= Test Scalar ======= @pandas_udf('int') def _plus_one_cpu_func(v: pd.Series) -> pd.Series: return v + 1 - @pandas_udf('int') def _plus_one_gpu_func(v: pd.Series) -> pd.Series: import cudf - gpu_series = cudf.Series(v) - gpu_series = gpu_series + 1 - return gpu_series.to_pandas() - + gpu_serises = cudf.Series(v) + gpu_serises = gpu_serises + 1 + return gpu_serises.to_pandas() +@allow_non_gpu(any=True) +@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") @cudf_udf def test_with_column(): def cpu_run(spark): @@ -84,32 +78,28 @@ def cpu_run(spark): def gpu_run(spark): df = _create_df(spark) return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect() - + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) - +@allow_non_gpu(any=True) +@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") @cudf_udf def test_sql(): def cpu_run(spark): _ = spark.udf.register("add_one_cpu", _plus_one_cpu_func) - _create_df(spark).createOrReplaceTempView("test_table_cpu") - return spark.sql("SELECT add_one_cpu(id) FROM test_table_cpu").collect() - + return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect() def gpu_run(spark): _ = spark.udf.register("add_one_gpu", _plus_one_gpu_func) - _create_df(spark).createOrReplaceTempView("test_table_gpu") - return spark.sql("SELECT add_one_gpu(id) FROM test_table_gpu").collect() - + return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect() + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) -# ======= Test Scalar Iterator ======= @pandas_udf("long") def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: for s in iterator: yield s + 1 - @pandas_udf("long") def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: import cudf @@ -117,8 +107,9 @@ def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series gpu_serises = cudf.Series(s) gpu_serises = gpu_serises + 1 yield gpu_serises.to_pandas() - - + +@allow_non_gpu(any=True) +@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now") @cudf_udf def test_select(): def cpu_run(spark): @@ -132,30 +123,29 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) -# ======= Test Flat Map In Pandas ======= +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuMapInPandasExec','PythonUDF') @cudf_udf def test_map_in_pandas(): def cpu_run(spark): + df = _create_df(spark) def _filter_cpu_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] - df = _create_df(spark) return df.mapInPandas(_filter_cpu_func, df.schema).collect() def gpu_run(spark): + df = _create_df(spark) def _filter_gpu_func(iterator): import cudf for pdf in iterator: gdf = cudf.from_pandas(pdf) yield gdf[gdf.id == 1].to_pandas() - df = _create_df(spark) return df.mapInPandas(_filter_gpu_func, df.schema).collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf) -# ======= Test Grouped Map In Pandas ======= # To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP # need to add udf type @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) @@ -163,7 +153,6 @@ def _normalize_cpu_func(df): v = df.v return df.assign(v=(v - v.mean()) / v.std()) - @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def _normalize_gpu_func(df): import cudf @@ -171,7 +160,7 @@ def _normalize_gpu_func(df): v = gdf.v return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() - +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply(): @@ -182,45 +171,44 @@ def cpu_run(spark): def gpu_run(spark): df = _create_df(spark) return df.groupby("id").apply(_normalize_gpu_func).collect() - + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF') @cudf_udf def test_group_apply_in_pandas(): def cpu_run(spark): + df = _create_df(spark) def _normalize_cpu_in_pandas_func(df): v = df.v return df.assign(v=(v - v.mean()) / v.std()) - df = _create_df(spark) return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect() def gpu_run(spark): + df = _create_df(spark) def _normalize_gpu_in_pandas_func(df): import cudf gdf = cudf.from_pandas(df) v = gdf.v return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas() - df = _create_df(spark) return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -# ======= Test Aggregate In Pandas ======= @pandas_udf("int") def _sum_cpu_func(v: pd.Series) -> int: return v.sum() - @pandas_udf("integer") def _sum_gpu_func(v: pd.Series) -> int: import cudf - gpu_series = cudf.Series(v) - return gpu_series.sum() - + gpu_serises = cudf.Series(v) + return gpu_serises.sum() +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_group_agg(): @@ -235,6 +223,7 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias') @cudf_udf def test_sql_group(): @@ -251,9 +240,8 @@ def gpu_run(spark): _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -# ======= Test Window In Pandas ======= -@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition', - 'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") +@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition','SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$') @cudf_udf def test_window(): def cpu_run(spark): @@ -266,39 +254,37 @@ def gpu_run(spark): w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect() - _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) + _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) -# ======= Test CoGroup Map In Pandas ======= +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746") @allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF') @cudf_udf def test_cogroup(): def cpu_run(spark): - def _cpu_join_func(l, r): - return pd.merge(l, r, on="time") df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) - return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, - schema="time int, id_x int, id_y int, v1 double, v2 string").collect() + def _cpu_join_func(l, r): + return pd.merge(l, r, on="time") + return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect() def gpu_run(spark): - def _gpu_join_func(l, r): - import cudf - gl = cudf.from_pandas(l) - gr = cudf.from_pandas(r) - return gl.merge(gr, on="time").to_pandas() df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) - return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, - schema="time int, id_x int, id_y int, v1 double, v2 string").collect() + def _gpu_join_func(l, r): + import cudf + gl = cudf.from_pandas(l) + gr = cudf.from_pandas(r) + return gl.merge(gr, on="time").to_pandas() + return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 847e0ac43f7..e2129d3002f 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -37,12 +37,10 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH" tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \ rm -f $SPARK_HOME.tgz -ARGS_SKIP_TESTS='not cudf_udf' - -mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" +mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS='' # Run the unit tests for other Spark versions but dont run full python integration tests -env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" -env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS" +env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf' +env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf' # The jacoco coverage should have been collected, but because of how the shade plugin # works and jacoco we need to clean some things up so jacoco will only report for the diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 79ee252b80a..677e5919c8b 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -70,11 +70,10 @@ MORTGAGE_SPARK_SUBMIT_ARGS=" --conf spark.plugins=com.nvidia.spark.SQLPlugin \ --class com.nvidia.spark.rapids.tests.mortgage.Main \ $RAPIDS_TEST_JAR" -RAPIDS_FILE_NAME=${RAPIDS_PLUGIN_JAR##*/} -CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \ - --conf spark.rapids.python.memory.gpu.allocFraction=0.1 \ - --conf spark.rapids.python.concurrentPythonWorkers=2 \ - --conf spark.executorEnv.PYTHONPATH=${RAPIDS_FILE_NAME} \ +# need to disable pooling for udf test to prevent cudaErrorMemoryAllocation +CUDF_UDF_TEST_ARGS="--conf spark.rapids.python.memory.gpu.pooling.enabled=false \ + --conf spark.rapids.memory.gpu.pooling.enabled=false \ + --conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar \ --py-files ${RAPIDS_PLUGIN_JAR}" TEST_PARAMS="$SPARK_VER $PARQUET_PERF $PARQUET_ACQ $OUTPUT" @@ -91,6 +90,5 @@ jps echo "----------------------------START TEST------------------------------------" rm -rf $OUTPUT spark-submit $BASE_SPARK_SUBMIT_ARGS $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS -cd $RAPIDS_INT_TESTS_HOME -spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/" -spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS ./runtests.py -m "cudf_udf" -v -rfExXs +cd $RAPIDS_INT_TESTS_HOME && spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/" +spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "cudf_udf" -v -rfExXs