From 231e4d7441713728b0f8d4e268b92c083c32c681 Mon Sep 17 00:00:00 2001 From: Peixin Li Date: Wed, 8 Dec 2021 15:42:03 +0800 Subject: [PATCH 1/2] Split expensive pytest files in cases level Signed-off-by: Peixin Li --- jenkins/spark-tests.sh | 53 +++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 1387be78d66..5113ff4be7a 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -115,6 +115,7 @@ export SPARK_TASK_MAXFAILURES=1 export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH" +export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=120 -Dspark.worker.cleanup.appDataTtl=60" #stop and restart SPARK ETL stop-slave.sh stop-master.sh @@ -137,18 +138,12 @@ export BASE_SPARK_SUBMIT_ARGS="$BASE_SPARK_SUBMIT_ARGS \ export SEQ_CONF="--executor-memory 16G \ --total-executor-cores 6" -# currently we hardcode the parallelism and configs based on our CI node's hardware specs, -# we can make it dynamically generated if this script is going to be used in other scenarios in the future -PARALLELISM=${PARALLELISM:-'4'} -MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.2))") export PARALLEL_CONF="--executor-memory 4G \ --total-executor-cores 1 \ --conf spark.executor.cores=1 \ --conf spark.task.cpus=1 \ --conf spark.rapids.sql.concurrentGpuTasks=1 \ ---conf spark.rapids.memory.gpu.minAllocFraction=0 \ ---conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \ ---conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}" +--conf spark.rapids.memory.gpu.minAllocFraction=0" export CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \ --conf spark.rapids.memory.gpu.minAllocFraction=0 \ @@ -190,7 +185,7 @@ run_test() { LOG_FILE="$TARGET_DIR/$TEST.log" # set dedicated RUN_DIRs here to avoid conflict between parallel tests RUN_DIR="$TARGET_DIR/run_dir_$TEST" \ - SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF" \ + SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF $MEMORY_FRACTION_CONF" \ ./run_pyspark_from_build.sh -k $TEST >"$LOG_FILE" 2>&1 CODE="$?" @@ -214,21 +209,51 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'} if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then # integration tests if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then - # put most time-consuming tests at the head of queue - time_consuming_tests="hash_aggregate_test.py join_test.py generate_expr_test.py parquet_write_test.py" - tests_list=$(find "$SCRIPT_PATH"/src/main/python/ -name "*_test.py" -printf "%f ") - tests=$(echo "$time_consuming_tests $tests_list" | tr ' ' '\n' | awk '!x[$0]++' | xargs) + # time-consuming tests, space-separated + time_consuming_tests="join_test hash_aggregate_test generate_expr_test parquet_write_test orc_test orc_write_test" + # memory-consuming cases in time-consuming tests, space-separated + mem_consuming_cases="test_hash_reduction_decimal_overflow_sum" + # hardcode parallelism as 2 for gpu-mem consuming cases + export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.45 \ + --conf spark.rapids.memory.gpu.maxAllocFraction=0.45" # --halt "now,fail=1": exit when the first job fail, and kill running jobs. # we can set it to "never" and print failed ones after finish running all tests if needed # --group: print stderr after test finished for better readability + parallel --group --halt "now,fail=1" -j2 run_test ::: ${mem_consuming_cases} + + time_consuming_tests_str=$(echo ${time_consuming_tests} | xargs | sed 's/ / or /g') + mem_consuming_cases_str=$(echo ${mem_consuming_cases} | xargs | sed 's/ / and not /g') + time_consuming_tests_cases=$(./run_pyspark_from_build.sh -k \ + "(${time_consuming_tests_str}) and not ${mem_consuming_cases_str}" \ + --collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs) + other_tests=$(./run_pyspark_from_build.sh --collect-only -qqq 2>/dev/null | grep -oP '(?<=python/).*?(?=.py)' \ + | grep -vP "$(echo ${time_consuming_tests} | xargs | tr ' ' '|')") + tests=$(echo "${time_consuming_tests_cases} ${other_tests}" | tr ' ' '\n' | awk '!x[$0]++' | xargs) + + if [[ "${PARALLELISM}" == "" ]]; then + PARALLELISM=$(nvidia-smi --query-gpu=memory.free --format=csv,noheader | \ + awk '{if (MAX < $1){ MAX = $1}} END {print int(MAX / (2 * 1024))}') + fi + MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.1))") + export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \ + --conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}" parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test ::: $tests else run_test all fi - # Temporarily only run on Spark 3.1.1 (https://github.com/NVIDIA/spark-rapids/issues/3311) if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then - run_test cache_serializer + if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then + cache_test_cases=$(./run_pyspark_from_build.sh -k "cache_test" \ + --collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs) + # hardcode parallelism as 4 + export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.18 \ + --conf spark.rapids.memory.gpu.maxAllocFraction=0.18 \ + --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer" + parallel --group --halt "now,fail=1" -j5 run_test ::: ${cache_test_cases} + else + run_test cache_serializer + fi fi fi From ce16d1789d3907b76a89c431eba19eee30a399ec Mon Sep 17 00:00:00 2001 From: Peixin Li Date: Fri, 10 Dec 2021 09:09:06 +0800 Subject: [PATCH 2/2] add more doc and rename func --- jenkins/spark-tests.sh | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 5113ff4be7a..be76e0978ba 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -114,7 +114,8 @@ export SPARK_TASK_MAXFAILURES=1 [[ "$IS_SPARK_311_OR_LATER" -eq "0" ]] && SPARK_TASK_MAXFAILURES=4 export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH" - +# enable worker cleanup to avoid "out of space" issue +# if failed, we abort the test instantly, so the failed executor log should still be left there for debugging export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=120 -Dspark.worker.cleanup.appDataTtl=60" #stop and restart SPARK ETL stop-slave.sh @@ -160,7 +161,7 @@ export SCRIPT_PATH="$(pwd -P)" export TARGET_DIR="$SCRIPT_PATH/target" mkdir -p $TARGET_DIR -run_test() { +run_test_not_parallel() { local TEST=${1//\.py/} local LOG_FILE case $TEST in @@ -199,7 +200,7 @@ run_test() { ;; esac } -export -f run_test +export -f run_test_not_parallel # TEST_MODE # - IT_ONLY @@ -209,9 +210,15 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'} if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then # integration tests if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then + # We separate tests/cases into different categories for parallel run to try avoid long tail distribution + # time_consuming_tests: tests that would cost over 1 hour if run sequentially, we split them into cases (time_consuming_tests_cases) + # mem_consuming_cases: cases in time_consuming_tests that would consume much more GPU memory than normal cases + # other_tests: tests except time_consuming_tests_cases and mem_consuming_cases + + # TODO: Tag these tests/cases # time-consuming tests, space-separated time_consuming_tests="join_test hash_aggregate_test generate_expr_test parquet_write_test orc_test orc_write_test" - # memory-consuming cases in time-consuming tests, space-separated + # GPU memory-consuming cases in time_consuming_tests, space-separated mem_consuming_cases="test_hash_reduction_decimal_overflow_sum" # hardcode parallelism as 2 for gpu-mem consuming cases export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.45 \ @@ -219,7 +226,7 @@ if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then # --halt "now,fail=1": exit when the first job fail, and kill running jobs. # we can set it to "never" and print failed ones after finish running all tests if needed # --group: print stderr after test finished for better readability - parallel --group --halt "now,fail=1" -j2 run_test ::: ${mem_consuming_cases} + parallel --group --halt "now,fail=1" -j2 run_test_not_parallel ::: ${mem_consuming_cases} time_consuming_tests_str=$(echo ${time_consuming_tests} | xargs | sed 's/ / or /g') mem_consuming_cases_str=$(echo ${mem_consuming_cases} | xargs | sed 's/ / and not /g') @@ -237,29 +244,29 @@ if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.1))") export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \ --conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}" - parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test ::: $tests + parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test_not_parallel ::: $tests else - run_test all + run_test_not_parallel all fi if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then cache_test_cases=$(./run_pyspark_from_build.sh -k "cache_test" \ --collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs) - # hardcode parallelism as 4 + # hardcode parallelism as 5 export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.18 \ --conf spark.rapids.memory.gpu.maxAllocFraction=0.18 \ --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer" - parallel --group --halt "now,fail=1" -j5 run_test ::: ${cache_test_cases} + parallel --group --halt "now,fail=1" -j5 run_test_not_parallel ::: ${cache_test_cases} else - run_test cache_serializer + run_test_not_parallel cache_serializer fi fi fi # cudf_udf_test if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "CUDF_UDF_ONLY" ]]; then - run_test cudf_udf_test + run_test_not_parallel cudf_udf_test fi popd