Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up test performance using pytest-xdist #1086

Merged
merged 4 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,35 @@ default version runs again Spark 3.0.0, to run against other version use one of

Integration tests are stored in the [integration_tests](../integration_tests/README.md) directory.
There are two frameworks used for testing. One is based off of pytest and pyspark in the
`src/main/python` directory. These tests will run as a part of the build if you have the environment
variable `SPARK_HOME` set. If you have` SPARK_CONF_DIR` also set the tests will try to use
whatever cluster you have configured.

To run the tests separate from the build go to the `integration_tests` directory and submit
`run_tests.py` through `spark-submit`. Be sure to include the necessary jars for the RAPIDS
plugin either with `spark-submit` or with the cluster when it is
`src/main/python` directory. These tests will run as a part of the maven build if you have the environment
variable `SPARK_HOME` set.

By default the tests try to use the python packages `pytest-xdist` and `findspark` to oversubscribe
your GPU and run the tests in Spark local mode. This can speed up these tests significantly as all
of the tests that run by default process relatively small amounts of data. Be careful because if
you have `SPARK_CONF_DIR` also set the tests will try to use whatever cluster you have configured.
If you do want to run the tests in parallel on an existing cluster it is recommended that you set
`-Dpytest.TEST_PARALLEL` to one less than the number of worker applications that will be
running on the cluster. This is because `pytest-xdist` will launch one control application that
is not included in that number. All it does is farm out work to the other applications, but because
it needs to know about the Spark cluster to determine which tests to run and how it still shows up
as a Spark application.

To run the tests separate from the build go to the `integration_tests` directory. You can submit
`runtests.py` through `spark-submit`, but if you want to run the tests in parallel with
`pytest-xdist` you will need to submit it as a regular python application and have `findspark`
installed. Be sure to include the necessary jars for the RAPIDS plugin either with
`spark-submit` or with the cluster when it is
[setup](get-started/getting-started-on-prem.md).
The command line arguments to `run_tests.py` are the same as for
The command line arguments to `runtests.py` are the same as for
[pytest](https://docs.pytest.org/en/latest/usage.html). The only reason we have a separate script
is that `spark-submit` uses python if the file name ends with `.py`.

If you want to configure the Spark cluster you may also set environment variables for the tests.
The name of the env var should be in the form `"PYSP_TEST_" + conf_key.replace('.', '_')`. Linux
does not allow '.' in the name of an environment variable so we replace it with an underscore. As
Spark configs avoid this character we have no other special processing.

We also have a large number of integration tests that currently run as a part of the unit tests
using scala test. Those are in the `src/test/scala` sub-directory and depend on the testing
framework from the `rapids-4-spark-tests_2.12` test jar.
Expand Down
52 changes: 43 additions & 9 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,36 @@ Should be enough to get the basics started.
### pandas
`pip install pandas`

`pandas` is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool.
`pandas` is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool and is
only needed when testing integration with pandas.

### pyarrow
`pip install pyarrow`

`pyarrow` provides a Python API for functionality provided by the Arrow C++ libraries, along with tools for Arrow integration and interoperability with pandas, NumPy, and other software in the Python ecosystem.
`pyarrow` provides a Python API for functionality provided by the Arrow C++ libraries, along with tools for Arrow
integration and interoperability with pandas, NumPy, and other software in the Python ecosystem. This is used
to test improved transfer performance to pandas based user defined functions.

## pytest-xdist and findspark

`pytest-xdist` and `findspark` can be used to speed up running the tests by running them in parallel.

## Running

Running the tests follows the pytest conventions, the main difference is using
`spark-submit` to launch the tests instead of pytest.
Running the tests follows the pytest conventions. If you want to submit the tests as a python process you need to
have `findspark` installed. If you want to submit it with `spark-submit` you may do that too, but it will prevent
you from running the tests in parallel with `pytest-xdist`.

```
$SPARK_HOME/bin/spark-submit ./runtests.py
```

or

```
python ./runtests.py
```

See `pytest -h` or `$SPARK_HOME/bin/spark-submit ./runtests.py -h` for more options.

Most clusters probably will not have the RAPIDS plugin installed in the cluster yet.
Expand All @@ -54,14 +68,13 @@ $SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar,cudf

You don't have to enable the plugin for this to work, the test framework will do that for you.

All of the tests will run in a single application. They just enable and disable the plugin as needed.

You do need to have access to a compatible GPU with the needed CUDA drivers. The exact details of how to set this up are beyond the scope of this document, but the Spark feature for scheduling GPUs does make this very simple if you have it configured.

### Runtime Environment

`--runtime_env` is used to specify the environment you are running the tests in. Valid values are `databricks` and `emr`. This is generally used
when certain environments have different behavior.
when certain environments have different behavior, and the tests don't have a good way to auto-detect the environment yet.

### timezone

Expand All @@ -74,12 +87,29 @@ Please be sure that the following configs are set when running the tests.
* `spark.executor.extraJavaOptions` should include `-Duser.timezone=GMT`
* `spark.sql.session.timeZone`=`UTC`

### Enabling TPCxBB/TPCH/Mortgage Tests
### Running in parallel

The TPCxBB, TPCH, and Mortgage tests in this framework can be enabled by providing a couple of options:
You may use `pytest-xdist` to run the tests in parallel. This is done by running the tests through `python`, not `spark-submit`,
and setting the parallelism with the `-n` command line parameter. Be aware that `pytest-xdist` will launch one control application
and the given number of worker applications, so your cluster needs to be large enough to handle one more application than the parallelism
you set. Most tests are small and don't need even a full GPU to run. So setting your applications to use a single executor and a single
GPU per executor is typically enough. When running from maven we assume that we are running in local mode and will try to
oversubscribe a single GPU. Typically we find that the tests don't need more than 2GB of GPU memory so we can speed up the tests significantly
by doing this. It is not easy nor recommended to try and configure an actual cluster so you can oversubscribe GPUs. Please don't try it.

Under YARN and Kubernetes you can set `spark.executor.instances` to the number of executors you want running in your application
(1 typically). Spark will auto launch a driver for each application too, but if you configured it correctly that would not take
any GPU resources on the cluster. For standalone, Mesos, and Kubernetes you can set `spark.cores.max` to one more than the number
of executors you want to use per application. The extra core is for the driver. Dynamic allocation can mess with these settings
under YARN and even though it is off by default you probably want to be sure it is disabled (spark.dynamicAllocation.enabled=false).

### Enabling TPCxBB/TPCH/TPCDS/Mortgage Tests

The TPCxBB, TPCH, TPCDS, and Mortgage tests in this framework can be enabled by providing a couple of options:

* TPCxBB `tpcxbb-format` (optional, defaults to "parquet"), and `tpcxbb-path` (required, path to the TPCxBB data).
* TPCH `tpch-format` (optional, defaults to "parquet"), and `tpch-path` (required, path to the TPCH data).
* TPCDS `tpcds-format` (optional, defaults to "parquet"), and `tpcds-path` (required, path to the TPCDS data).
* Mortgage `mortgage-format` (optional, defaults to "parquet"), and `mortgage-path` (required, path to the Mortgage data).

As an example, here is the `spark-submit` command with the TPCxBB parameters:
Expand All @@ -88,6 +118,11 @@ As an example, here is the `spark-submit` command with the TPCxBB parameters:
$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar,cudf-0.17-SNAPSHOT.jar,rapids-4-spark-tests_2.12-0.3.0-SNAPSHOT.jar" ./runtests.py --tpcxbb_format="csv" --tpcxbb_path="/path/to/tpcxbb/csv"
```

Be aware that running these tests with read data requires at least an entire GPU, and preferable several GPUs/executors
in your cluster so please be careful when enabling these tests. Also some of these test actually produce non-deterministic
results when run in a real cluster. If you do see failures when running these tests please contact us so we can investigate
them and possibly tag the tests appropriately when running on an actual cluster.

### Enabling cudf_udf Tests

The cudf_udf tests in this framework are testing Pandas UDF(user-defined function) with cuDF. They are disabled by default not only because of the complicated environment setup, but also because GPU resources scheduling for Pandas UDF is an experimental feature now, the performance may not always be better.
Expand All @@ -112,7 +147,6 @@ As an example, here is the `spark-submit` command with the cudf_udf parameter:
$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar,cudf-0.17-SNAPSHOT.jar,rapids-4-spark-tests_2.12-0.3.0-SNAPSHOT.jar" --conf spark.rapids.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.concurrentPythonWorkers=2 --py-files "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar" --conf spark.executorEnv.PYTHONPATH="rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar" ./runtests.py --cudf_udf
```


## Writing tests

There are a number of libraries provided to help someone write new tests.
Expand Down
11 changes: 6 additions & 5 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
<groupId>ai.rapids</groupId>
<artifactId>cudf</artifactId>
<classifier>${cuda.version}</classifier>
<scope>provided</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
Expand Down Expand Up @@ -148,9 +148,9 @@
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<configuration>
<configuration>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -202,8 +202,9 @@
<environmentVariables>
<SKIP_TESTS>${skipTests}</SKIP_TESTS>
<TEST>${test}</TEST>
<COVERAGE_SUBMIT_FLAGS>${argLine}</COVERAGE_SUBMIT_FLAGS>
<TEST_TAGS>${pytest.TEST_TAGS}</TEST_TAGS>
<COVERAGE_SUBMIT_FLAGS>${argLine}</COVERAGE_SUBMIT_FLAGS>
<TEST_TAGS>${pytest.TEST_TAGS}</TEST_TAGS>
<TEST_PARALLEL>${pytest.TEST_PARALLEL}</TEST_PARALLEL>
</environmentVariables>
</configuration>
</execution>
Expand Down
75 changes: 68 additions & 7 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,80 @@ else
then
TEST_TAGS="-m $TEST_TAGS"
fi
if [[ "${TEST_PARALLEL}" == "" ]];
then
# For now just assume that we are going to use the GPU on the
# system with the most free memory and then divide it up into chunks.
# We use free memory to try and avoid issues if the GPU also is working
# on graphics, which happens some times.
# We subtract one for the main controlling process that will still
# launch an application. It will not run thing on the GPU but it needs
# to still launch a spark application.
TEST_PARALLEL=`nvidia-smi --query-gpu=memory.free --format=csv,noheader | awk '{if (MAX < $1){ MAX = $1}} END {print int(MAX / (2.3 * 1024)) - 1}'`
echo "AUTO DETECTED PARALLELISM OF $TEST_PARALLEL"
fi
if python -c 'import findspark';
then
echo "FOUND findspark"
FIND_SPARK=1
else
TEST_PARALLEL=0
FIND_SPARK=0
echo "findspark not installed cannot run tests in parallel"
fi
if python -c 'import xdist.plugin';
then
echo "FOUND xdist"
else
TEST_PARALLEL=0
echo "xdist not installed cannot run tests in parallel"
fi

if [[ ${TEST_PARALLEL} -lt 2 ]];
then
# With xdist 0 and 1 are the same parallelsm but
# 0 is more effecient
TEST_PARALLEL=""
MEMORY_FRACTION='1'
else
MEMORY_FRACTION=`python -c "print(1/($TEST_PARALLEL + 1))"`
TEST_PARALLEL="-n $TEST_PARALLEL"
fi
RUN_DIR="$SCRIPTPATH"/target/run_dir
mkdir -p "$RUN_DIR"
cd "$RUN_DIR"
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" \
--conf "spark.driver.extraJavaOptions=-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" \
--conf 'spark.executor.extraJavaOptions=-ea -Duser.timezone=GMT' \
--conf 'spark.sql.session.timeZone=UTC' \
--conf 'spark.sql.shuffle.partitions=12' \
$SPARK_SUBMIT_FLAGS \
"$SCRIPTPATH"/runtests.py --rootdir "$SCRIPTPATH" "$SCRIPTPATH"/src/main/python \
if [[ "${FIND_SPARK}" == "1" ]];
then
export PYSP_TEST_spark_driver_extraClassPath="${ALL_JARS// /:}"
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=GMT'
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_shuffle_partitions='12'
export PYSP_TEST_spark_rapids_memory_gpu_allocFraction=$MEMORY_FRACTION
export PYSP_TEST_spark_rapids_memory_gpu_maxAllocFraction=$MEMORY_FRACTION

python \
"$SCRIPTPATH"/runtests.py --rootdir "$SCRIPTPATH" "$SCRIPTPATH"/src/main/python \
$TEST_PARALLEL \
-v -rfExXs "$TEST_TAGS" \
--std_input_path="$SCRIPTPATH"/src/test/resources/ \
"$TEST_ARGS" \
$RUN_TEST_PARAMS \
"$@"
else
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" \
--conf "spark.driver.extraJavaOptions=-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" \
--conf 'spark.executor.extraJavaOptions=-ea -Duser.timezone=GMT' \
--conf 'spark.sql.session.timeZone=UTC' \
--conf 'spark.sql.shuffle.partitions=12' \
$SPARK_SUBMIT_FLAGS \
"$SCRIPTPATH"/runtests.py --rootdir "$SCRIPTPATH" "$SCRIPTPATH"/src/main/python \
-v -rfExXs "$TEST_TAGS" \
--std_input_path="$SCRIPTPATH"/src/test/resources/ \
"$TEST_ARGS" \
$RUN_TEST_PARAMS \
"$@"

fi
fi
21 changes: 21 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ def spark_tmp_path(request):
if not debug:
fs.delete(path)

class TmpTableFactory:
def __init__(self, base_id):
self.base_id = base_id
self.running_id = 0

def get(self):
ret = '{}_{}'.format(self.base_id, self.running_id)
self.running_id = self.running_id + 1
return ret

@pytest.fixture
def spark_tmp_table_factory(request):
base_id = 'tmp_table_{}'.format(random.randint(0, 1000000))
yield TmpTableFactory(base_id)
sp = get_spark_i_know_what_i_am_doing()
tables = sp.sql("SHOW TABLES".format(base_id)).collect()
for row in tables:
t_name = row['tableName']
if (t_name.startswith(base_id)):
sp.sql("DROP TABLE IF EXISTS {}".format(t_name))

def _get_jvm_session(spark):
return spark._jsparkSession

Expand Down
7 changes: 4 additions & 3 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,17 @@ def do_join(spark):
@pytest.mark.xfail(condition=is_emr_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/821')
@pytest.mark.parametrize('repartition', ["true", "false"], ids=idfn)
def test_join_bucketed_table(repartition):
def test_join_bucketed_table(repartition, spark_tmp_table_factory):
def do_join(spark):
table_name = spark_tmp_table_factory.get()
data = [("http://fooblog.com/blog-entry-116.html", "https://fooblog.com/blog-entry-116.html"),
("http://fooblog.com/blog-entry-116.html", "http://fooblog.com/blog-entry-116.html")]
resolved = spark.sparkContext.parallelize(data).toDF(['Url','ResolvedUrl'])
feature_data = [("http://fooblog.com/blog-entry-116.html", "21")]
feature = spark.sparkContext.parallelize(feature_data).toDF(['Url','Count'])
feature.write.bucketBy(400, 'Url').sortBy('Url').format('parquet').mode('overwrite')\
.saveAsTable('featuretable')
testurls = spark.sql("SELECT Url, Count FROM featuretable")
.saveAsTable(table_name)
testurls = spark.sql("SELECT Url, Count FROM {}".format(table_name))
if (repartition == "true"):
return testurls.repartition(20).join(resolved, "Url", "inner")
else:
Expand Down
18 changes: 10 additions & 8 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,24 +400,26 @@ def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs):
'input_file_block_length()'),
conf=all_confs)

def createBucketedTableAndJoin(spark):
spark.range(10e4).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e4")
spark.range(10e6).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e6")
bucketed_4_10e4 = spark.table("bucketed_4_10e4")
bucketed_4_10e6 = spark.table("bucketed_4_10e6")
def createBucketedTableAndJoin(spark, tbl_1, tbl_2):
spark.range(10e4).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable(tbl_1)
spark.range(10e6).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable(tbl_2)
bucketed_4_10e4 = spark.table(tbl_1)
bucketed_4_10e6 = spark.table(tbl_2)
return bucketed_4_10e4.join(bucketed_4_10e6, "id")

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
# this test would be better if we could ensure exchanges didn't exist - ie used buckets
def test_buckets(spark_tmp_path, v1_enabled_list, reader_confs):
def test_buckets(spark_tmp_path, v1_enabled_list, reader_confs, spark_tmp_table_factory):
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
"spark.sql.autoBroadcastJoinThreshold": '-1'})
assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin,
conf=all_confs)
def do_it(spark):
return createBucketedTableAndJoin(spark, spark_tmp_table_factory.get(),
spark_tmp_table_factory.get())
assert_gpu_and_cpu_are_equal_collect(do_it, conf=all_confs)

@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_small_file_memory(spark_tmp_path, v1_enabled_list):
Expand Down
Loading