Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split expensive pytest files in cases level [skip ci] #4336

Merged
merged 2 commits into from
Dec 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export SPARK_TASK_MAXFAILURES=1

export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"

export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=120 -Dspark.worker.cleanup.appDataTtl=60"
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
#stop and restart SPARK ETL
stop-slave.sh
stop-master.sh
Expand All @@ -137,18 +138,12 @@ export BASE_SPARK_SUBMIT_ARGS="$BASE_SPARK_SUBMIT_ARGS \
export SEQ_CONF="--executor-memory 16G \
--total-executor-cores 6"

# currently we hardcode the parallelism and configs based on our CI node's hardware specs,
# we can make it dynamically generated if this script is going to be used in other scenarios in the future
PARALLELISM=${PARALLELISM:-'4'}
MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.2))")
export PARALLEL_CONF="--executor-memory 4G \
--total-executor-cores 1 \
--conf spark.executor.cores=1 \
--conf spark.task.cpus=1 \
--conf spark.rapids.sql.concurrentGpuTasks=1 \
--conf spark.rapids.memory.gpu.minAllocFraction=0 \
--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \
--conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}"
--conf spark.rapids.memory.gpu.minAllocFraction=0"

export CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.memory.gpu.minAllocFraction=0 \
Expand Down Expand Up @@ -190,7 +185,7 @@ run_test() {
LOG_FILE="$TARGET_DIR/$TEST.log"
# set dedicated RUN_DIRs here to avoid conflict between parallel tests
RUN_DIR="$TARGET_DIR/run_dir_$TEST" \
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF" \
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF $MEMORY_FRACTION_CONF" \
./run_pyspark_from_build.sh -k $TEST >"$LOG_FILE" 2>&1

CODE="$?"
Expand All @@ -214,21 +209,51 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'}
if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then
# integration tests
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
# put most time-consuming tests at the head of queue
time_consuming_tests="hash_aggregate_test.py join_test.py generate_expr_test.py parquet_write_test.py"
tests_list=$(find "$SCRIPT_PATH"/src/main/python/ -name "*_test.py" -printf "%f ")
tests=$(echo "$time_consuming_tests $tests_list" | tr ' ' '\n' | awk '!x[$0]++' | xargs)
# time-consuming tests, space-separated
time_consuming_tests="join_test hash_aggregate_test generate_expr_test parquet_write_test orc_test orc_write_test"
# memory-consuming cases in time-consuming tests, space-separated
mem_consuming_cases="test_hash_reduction_decimal_overflow_sum"
# hardcode parallelism as 2 for gpu-mem consuming cases
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.45 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.45"
# --halt "now,fail=1": exit when the first job fail, and kill running jobs.
# we can set it to "never" and print failed ones after finish running all tests if needed
# --group: print stderr after test finished for better readability
parallel --group --halt "now,fail=1" -j2 run_test ::: ${mem_consuming_cases}

time_consuming_tests_str=$(echo ${time_consuming_tests} | xargs | sed 's/ / or /g')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a tag for these tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good if we could document better some place about these things

Copy link
Collaborator Author

@pxLi pxLi Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a tag for these tests?

Yes, tag would be better here. Let me add some +TODO, and I will add some tags here if we see more cases in this category

mem_consuming_cases_str=$(echo ${mem_consuming_cases} | xargs | sed 's/ / and not /g')
time_consuming_tests_cases=$(./run_pyspark_from_build.sh -k \
"(${time_consuming_tests_str}) and not ${mem_consuming_cases_str}" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
other_tests=$(./run_pyspark_from_build.sh --collect-only -qqq 2>/dev/null | grep -oP '(?<=python/).*?(?=.py)' \
| grep -vP "$(echo ${time_consuming_tests} | xargs | tr ' ' '|')")
tests=$(echo "${time_consuming_tests_cases} ${other_tests}" | tr ' ' '\n' | awk '!x[$0]++' | xargs)

if [[ "${PARALLELISM}" == "" ]]; then
PARALLELISM=$(nvidia-smi --query-gpu=memory.free --format=csv,noheader | \
awk '{if (MAX < $1){ MAX = $1}} END {print int(MAX / (2 * 1024))}')
fi
MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.1))")
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \
--conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}"
parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test ::: $tests
else
run_test all
fi

# Temporarily only run on Spark 3.1.1 (https://github.com/NVIDIA/spark-rapids/issues/3311)
if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then
run_test cache_serializer
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
cache_test_cases=$(./run_pyspark_from_build.sh -k "cache_test" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
# hardcode parallelism as 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment looks wrong, using -j5 below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, fixed

export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.18 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.18 \
--conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer"
parallel --group --halt "now,fail=1" -j5 run_test ::: ${cache_test_cases}
else
run_test cache_serializer
fi
fi
fi

Expand Down