Skip to content

Commit

Permalink
Revert "Enable tests in udf_cudf_test.py (#777)" (#813)
Browse files Browse the repository at this point in the history
This reverts commit f76ed9c.

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
tgravescs authored Sep 21, 2020
1 parent 86455b5 commit 007f884
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 72 deletions.
104 changes: 45 additions & 59 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import pytest
import pandas as pd
import time
from distutils.version import LooseVersion
from typing import Iterator
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_init_internal import spark_version
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf

pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'),
reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet")


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
Expand All @@ -36,11 +31,11 @@
'spark.rapids.sql.python.gpu.enabled': 'true'
}


def _create_df(spark):
elements = list(map(lambda i: (i, i/1.0), range(1, 5000)))
return spark.createDataFrame(elements * 2, ("id", "v"))

return spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)

# since this test requires to run different functions on CPU and GPU(need cudf),
# create its own assert function
Expand All @@ -59,22 +54,21 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False)
assert cpu_ret.sort() == gpu_ret.sort()
else:
assert cpu_ret == gpu_ret



# ======= Test Scalar =======
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1


@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()

gpu_serises = cudf.Series(v)
gpu_serises = gpu_serises + 1
return gpu_serises.to_pandas()

@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")
@cudf_udf
def test_with_column():
def cpu_run(spark):
Expand All @@ -84,41 +78,38 @@ def cpu_run(spark):
def gpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")
@cudf_udf
def test_sql():
def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
_create_df(spark).createOrReplaceTempView("test_table_cpu")
return spark.sql("SELECT add_one_cpu(id) FROM test_table_cpu").collect()

return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect()
def gpu_run(spark):
_ = spark.udf.register("add_one_gpu", _plus_one_gpu_func)
_create_df(spark).createOrReplaceTempView("test_table_gpu")
return spark.sql("SELECT add_one_gpu(id) FROM test_table_gpu").collect()

return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Scalar Iterator =======
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1


@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()



@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")
@cudf_udf
def test_select():
def cpu_run(spark):
Expand All @@ -132,46 +123,44 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Flat Map In Pandas =======
@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuMapInPandasExec','PythonUDF')
@cudf_udf
def test_map_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _filter_cpu_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df = _create_df(spark)
return df.mapInPandas(_filter_cpu_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _filter_gpu_func(iterator):
import cudf
for pdf in iterator:
gdf = cudf.from_pandas(pdf)
yield gdf[gdf.id == 1].to_pandas()
df = _create_df(spark)
return df.mapInPandas(_filter_gpu_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Grouped Map In Pandas =======
# To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
# need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply():
Expand All @@ -182,45 +171,44 @@ def cpu_run(spark):
def gpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_gpu_func).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _normalize_cpu_in_pandas_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _normalize_gpu_in_pandas_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


# ======= Test Aggregate In Pandas =======
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()


@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_series = cudf.Series(v)
return gpu_series.sum()

gpu_serises = cudf.Series(v)
return gpu_serises.sum()

@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_group_agg():
Expand All @@ -235,6 +223,7 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_sql_group():
Expand All @@ -251,9 +240,8 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


# ======= Test Window In Pandas =======
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition',
'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition','SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
@cudf_udf
def test_window():
def cpu_run(spark):
Expand All @@ -266,39 +254,37 @@ def gpu_run(spark):
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


# ======= Test CoGroup Map In Pandas =======
@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_cogroup():
def cpu_run(spark):
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

def gpu_run(spark):
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)

Expand Down
8 changes: 3 additions & 5 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"
tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
rm -f $SPARK_HOME.tgz

ARGS_SKIP_TESTS='not cudf_udf'

mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"
mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS=''
# Run the unit tests for other Spark versions but dont run full python integration tests
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS="$ARGS_SKIP_TESTS"
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'

# The jacoco coverage should have been collected, but because of how the shade plugin
# works and jacoco we need to clean some things up so jacoco will only report for the
Expand Down
14 changes: 6 additions & 8 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ MORTGAGE_SPARK_SUBMIT_ARGS=" --conf spark.plugins=com.nvidia.spark.SQLPlugin \
--class com.nvidia.spark.rapids.tests.mortgage.Main \
$RAPIDS_TEST_JAR"

RAPIDS_FILE_NAME=${RAPIDS_PLUGIN_JAR##*/}
CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.concurrentPythonWorkers=2 \
--conf spark.executorEnv.PYTHONPATH=${RAPIDS_FILE_NAME} \
# need to disable pooling for udf test to prevent cudaErrorMemoryAllocation
CUDF_UDF_TEST_ARGS="--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.memory.gpu.pooling.enabled=false \
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar \
--py-files ${RAPIDS_PLUGIN_JAR}"

TEST_PARAMS="$SPARK_VER $PARQUET_PERF $PARQUET_ACQ $OUTPUT"
Expand All @@ -91,6 +90,5 @@ jps
echo "----------------------------START TEST------------------------------------"
rm -rf $OUTPUT
spark-submit $BASE_SPARK_SUBMIT_ARGS $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS
cd $RAPIDS_INT_TESTS_HOME
spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/"
spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS ./runtests.py -m "cudf_udf" -v -rfExXs
cd $RAPIDS_INT_TESTS_HOME && spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "not cudf_udf" -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/"
spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "cudf_udf" -v -rfExXs

0 comments on commit 007f884

Please sign in to comment.