Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pytest tags for nightly test parallel run [skip ci] #4373

Merged
merged 3 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ markers =
validate_execs_in_gpu_plan([execs]): Exec class names to validate they exist in the GPU plan.
shuffle_test: Mark to include test in the RAPIDS Shuffle Manager
premerge_ci_1: Mark test that will run in first k8s pod in case of parallel build premerge job
nightly_resource_consuming_test: tests either time-consuming or mem-consuming, we split them into cases
nightly_gpu_mem_consuming_case: case in nightly_resource_consuming_test that consume much more GPU memory than normal cases
nightly_host_mem_consuming_case: case in nightly_resource_consuming_test that consume much more host memory than normal cases
filterwarnings =
ignore:.*pytest.mark.order.*:_pytest.warning_types.PytestUnknownMarkWarning
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f

pytestmark = pytest.mark.nightly_resource_consuming_test

def four_op_df(spark, gen, length=2048, seed=0):
return gen_df(spark, StructGen([
('a', gen),
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import pyspark.sql.functions as f
from spark_session import is_before_spark_311, with_cpu_session

pytestmark = pytest.mark.nightly_resource_consuming_test

_approx_percentile_conf = { 'spark.rapids.sql.expression.ApproximatePercentile': 'true' }

_no_nans_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
Expand Down Expand Up @@ -303,6 +305,7 @@ def get_params(init_list, marked_params=[]):
_grpkey_short_full_neg_scale_decimals]

#Any smaller precision takes way too long to process on the CPU
@nightly_gpu_mem_consuming_case
@pytest.mark.parametrize('precision', [38, 37, 36, 35, 34, 33, 32, 31, 30], ids=idfn)
def test_hash_reduction_decimal_overflow_sum(precision):
constant = '9' * precision
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from spark_session import with_cpu_session, with_spark_session

# Mark all tests in current file as premerge_ci_1 in order to be run in first k8s pod for parallel build premerge job
pytestmark = pytest.mark.premerge_ci_1
pytestmark = [pytest.mark.premerge_ci_1, pytest.mark.nightly_resource_consuming_test]

all_join_types = ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter']

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@
cudf_udf = pytest.mark.cudf_udf
rapids_udf_example_native = pytest.mark.rapids_udf_example_native
shuffle_test = pytest.mark.shuffle_test
nightly_gpu_mem_consuming_case = pytest.mark.nightly_gpu_mem_consuming_case
nightly_host_mem_consuming_case = pytest.mark.nightly_host_mem_consuming_case
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from spark_session import with_cpu_session
from parquet_test import _nested_pruning_schemas

pytestmark = pytest.mark.nightly_resource_consuming_test

def read_orc_df(data_path):
return lambda spark : spark.read.orc(data_path)

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from marks import *
from pyspark.sql.types import *

pytestmark = pytest.mark.nightly_resource_consuming_test

orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc)) ] + \
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import random
from spark_session import is_before_spark_311

pytestmark = pytest.mark.nightly_resource_consuming_test

# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for
# non-cloud
original_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
Expand Down
55 changes: 32 additions & 23 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,26 @@ run_test_not_parallel() {
}
export -f run_test_not_parallel

get_cases_by_tags() {
local cases
local args=${2}
cases=$(TEST_TAGS="${1}" \
./run_pyspark_from_build.sh "${args}" --collect-only -p no:warnings -qq 2>/dev/null \
| grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
echo "$cases"
}
export -f get_cases_by_tags

get_tests_by_tags() {
local tests
local args=${2}
tests=$(TEST_TAGS="${1}" \
./run_pyspark_from_build.sh "${args}" --collect-only -qqq -p no:warnings 2>/dev/null \
| grep -oP '(?<=python/).*?(?=.py)' | xargs)
echo "$tests"
}
export -f get_tests_by_tags

# TEST_MODE
# - IT_ONLY
# - CUDF_UDF_ONLY
Expand All @@ -211,32 +231,22 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'}
if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then
# integration tests
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
# We separate tests/cases into different categories for parallel run to try avoid long tail distribution
# time_consuming_tests: tests that would cost over 1 hour if run sequentially, we split them into cases (time_consuming_tests_cases)
# mem_consuming_cases: cases in time_consuming_tests that would consume much more GPU memory than normal cases
# other_tests: tests except time_consuming_tests_cases and mem_consuming_cases

# TODO: Tag these tests/cases
# time-consuming tests, space-separated
time_consuming_tests="join_test hash_aggregate_test generate_expr_test parquet_write_test orc_test orc_write_test"
# GPU memory-consuming cases in time_consuming_tests, space-separated
mem_consuming_cases="test_hash_reduction_decimal_overflow_sum"
# hardcode parallelism as 2 for gpu-mem consuming cases
# separate run for special cases that require smaller parallelism
special_cases=$(get_cases_by_tags "nightly_resource_consuming_test \
and (nightly_gpu_mem_consuming_case or nightly_host_mem_consuming_case)")
# hardcode parallelism as 2 for special cases
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.45 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.45"
# --halt "now,fail=1": exit when the first job fail, and kill running jobs.
# we can set it to "never" and print failed ones after finish running all tests if needed
# --group: print stderr after test finished for better readability
parallel --group --halt "now,fail=1" -j2 run_test_not_parallel ::: ${mem_consuming_cases}
parallel --group --halt "now,fail=1" -j2 run_test_not_parallel ::: ${special_cases}

time_consuming_tests_str=$(echo ${time_consuming_tests} | xargs | sed 's/ / or /g')
mem_consuming_cases_str=$(echo ${mem_consuming_cases} | xargs | sed 's/ / and not /g')
time_consuming_tests_cases=$(./run_pyspark_from_build.sh -k \
"(${time_consuming_tests_str}) and not ${mem_consuming_cases_str}" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
other_tests=$(./run_pyspark_from_build.sh --collect-only -qqq 2>/dev/null | grep -oP '(?<=python/).*?(?=.py)' \
| grep -vP "$(echo ${time_consuming_tests} | xargs | tr ' ' '|')")
tests=$(echo "${time_consuming_tests_cases} ${other_tests}" | tr ' ' '\n' | awk '!x[$0]++' | xargs)
resource_consuming_cases=$(get_cases_by_tags "nightly_resource_consuming_test \
and not nightly_gpu_mem_consuming_case \
and not nightly_host_mem_consuming_case")
other_tests=$(get_tests_by_tags "not nightly_resource_consuming_test")
tests=$(echo "${resource_consuming_cases} ${other_tests}" | tr ' ' '\n' | awk '!x[$0]++' | xargs)

if [[ "${PARALLELISM}" == "" ]]; then
PARALLELISM=$(nvidia-smi --query-gpu=memory.free --format=csv,noheader | \
Expand All @@ -245,15 +255,14 @@ if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then
MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.1))")
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \
--conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}"
parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test_not_parallel ::: $tests
parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test_not_parallel ::: ${tests}
else
run_test_not_parallel all
fi

if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
cache_test_cases=$(./run_pyspark_from_build.sh -k "cache_test" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
cache_test_cases=$(get_cases_by_tags "" "-k cache_test")
# hardcode parallelism as 5
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.18 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.18 \
Expand Down