Skip to content

Commit

Permalink
[FLINK-12513][e2e] Delegate Flink lib folder clean up to the test runner
Browse files Browse the repository at this point in the history
  • Loading branch information
1u0 authored and pnowojski committed Jun 7, 2019
1 parent a0cf205 commit e492d90
Show file tree
Hide file tree
Showing 18 changed files with 29 additions and 128 deletions.
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Please note that a previously supported pattern where you could assign a value t
The test runner performs a cleanup after each test case, which includes:
- Stopping the cluster
- Killing all task and job managers
- Reverting config to default (if changed before)
- Reverting `conf` and `lib` dirs to default
- Cleaning up log and temp directories

In some cases your test is required to do to some *additional* cleanup, for example shutting down external systems like Kafka or Elasticsearch. In this case it is a common pattern to trap a `test_cleanup` function to `EXIT` like this:
Expand Down
36 changes: 20 additions & 16 deletions flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,27 @@ function print_mem_use {
fi
}

function backup_config() {
# back up the masters and flink-conf.yaml
cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
function backup_flink_dir() {
mkdir -p "${TEST_DATA_DIR}/tmp/backup"
# Note: not copying all directory tree, as it may take some time on some file systems.
cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/tmp/backup/"
cp -r "${FLINK_DIR}/lib" "${TEST_DATA_DIR}/tmp/backup/"
}

function revert_default_config() {
function revert_flink_dir() {

# revert our modifications to the masters file
if [ -f $FLINK_DIR/conf/masters.bak ]; then
mv -f $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters
if [ -d "${TEST_DATA_DIR}/tmp/backup/conf" ]; then
rm -rf "${FLINK_DIR}/conf"
mv "${TEST_DATA_DIR}/tmp/backup/conf" "${FLINK_DIR}/"
fi

# revert our modifications to the Flink conf yaml
if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
if [ -d "${TEST_DATA_DIR}/tmp/backup/lib" ]; then
rm -rf "${FLINK_DIR}/lib"
mv "${TEST_DATA_DIR}/tmp/backup/lib" "${FLINK_DIR}/"
fi

rm -r "${TEST_DATA_DIR}/tmp/backup"

REST_PROTOCOL="http"
CURL_SSL_ARGS=""
}
Expand All @@ -104,6 +107,11 @@ function set_conf() {
echo "$CONF_NAME: $VAL" >> $FLINK_DIR/conf/flink-conf.yaml
}

function add_optional_lib() {
local lib_name=$1
cp "$FLINK_DIR/opt/flink-${lib_name}"*".jar" "$FLINK_DIR/lib"
}

function change_conf() {
CONF_NAME=$1
OLD_VAL=$2
Expand Down Expand Up @@ -524,15 +532,11 @@ function kill_random_taskmanager {

function setup_flink_slf4j_metric_reporter() {
INTERVAL="${1:-1 SECONDS}"
cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
add_optional_lib "metrics-slf4j"
set_conf "metrics.reporter.slf4j.class" "org.apache.flink.metrics.slf4j.Slf4jReporter"
set_conf "metrics.reporter.slf4j.interval" "${INTERVAL}"
}

function rollback_flink_slf4j_metric_reporter() {
rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
}

function get_job_metric {
local job_id=$1
local metric_name=$2
Expand Down
12 changes: 1 addition & 11 deletions flink-end-to-end-tests/test-scripts/common_s3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,7 @@ s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.ja
# None
###################################
function s3_setup {
# make sure we delete the file at the end
function s3_cleanup {
rm $FLINK_DIR/lib/flink-s3-fs*.jar

# remove any leftover settings
sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
}
trap s3_cleanup EXIT

cp $FLINK_DIR/opt/flink-s3-fs-$1-*.jar $FLINK_DIR/lib/
add_optional_lib "s3-fs-$1"
echo "s3.access-key: $IT_CASE_S3_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
echo "s3.secret-key: $IT_CASE_S3_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
}
Expand Down
17 changes: 2 additions & 15 deletions flink-end-to-end-tests/test-scripts/queryable_state_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,11 @@
################################################################################

function link_queryable_state_lib {
echo "Moving flink-queryable-state-runtime from opt/ to lib/"
mv ${FLINK_DIR}/opt/flink-queryable-state-runtime* ${FLINK_DIR}/lib/
if [ $? != 0 ]; then
echo "Failed to move flink-queryable-state-runtime from opt/ to lib/. Exiting"
exit 1
fi
echo "Adding flink-queryable-state-runtime to lib/"
add_optional_lib "queryable-state-runtime"
set_conf "queryable-state.enable" "true"
}

function unlink_queryable_state_lib {
echo "Moving flink-queryable-state-runtime from lib/ to opt/"
mv ${FLINK_DIR}/lib/flink-queryable-state-runtime* ${FLINK_DIR}/opt/
if [ $? != 0 ]; then
echo "Failed to move flink-queryable-state-runtime from lib/ to opt/. Exiting"
exit 1
fi
}

# Returns the ip address of the queryable state server
function get_queryable_state_server_ip {
local ip=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
Expand Down
4 changes: 2 additions & 2 deletions flink-end-to-end-tests/test-scripts/test-runner-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function run_test {
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"

backup_config
backup_flink_dir
start_timer

function test_error() {
Expand Down Expand Up @@ -96,7 +96,7 @@ function post_test_validation {
# Shuts down cluster and reverts changes to cluster configs
function cleanup_proc {
shutdown_all
revert_default_config
revert_flink_dir
}

# Cleans up all temporary folders and files
Expand Down
10 changes: 1 addition & 9 deletions flink-end-to-end-tests/test-scripts/test_azure_fs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,9 @@ AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOU
# None
###################################
function azure_setup {
# make sure we delete the file at the end
function azure_cleanup {
rm $FLINK_DIR/lib/flink-azure-fs*.jar

# remove any leftover settings
sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml"
}
trap azure_cleanup EXIT

echo "Copying flink azure jars and writing out configs"
cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/
add_optional_lib "azure-fs-hadoop"
echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ function ha_cleanup() {

stop_watchdogs
kill_all 'StandaloneJobClusterEntryPoint'
rm ${FLINK_LIB_DIR}/${TEST_PROGRAM_JAR_NAME}
}

trap ha_cleanup INT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ function check_logs {
fi
}

# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary
# This function does a cleanup after the test. The watchdog is terminated and temporary
# files and folders are deleted.
function cleanup_after_test {
# Reset the configurations
sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties"
#
kill ${watchdog_pid} 2> /dev/null
wait ${watchdog_pid} 2> /dev/null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ function run_test {
}

function test_cleanup {
unlink_queryable_state_lib
clean_stdout_files
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,4 @@ function get_completed_number_of_checkpoints {
sed 's/,.*//' # 24
}

function test_cleanup {
unlink_queryable_state_lib
}

trap test_cleanup EXIT
run_test
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@ set_conf "metrics.fetcher.update-interval" "2000"
setup_flink_slf4j_metric_reporter
start_cluster

function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT

rollback_flink_slf4j_metric_reporter
}
trap test_cleanup INT
trap test_cleanup EXIT

CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"

Expand Down
13 changes: 0 additions & 13 deletions flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,6 @@ setup_flink_slf4j_metric_reporter

start_cluster

# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT

# revert our modifications to the Flink distribution
rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
}
trap test_cleanup INT
trap test_cleanup EXIT

CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"

# run the DataStream allroundjob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ set_conf "metrics.fetcher.update-interval" "2000"

start_cluster

# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT

# revert our modifications to the Flink distribution
rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
}
trap test_cleanup INT
trap test_cleanup EXIT

CHECKPOINT_DIR="file://${TEST_DATA_DIR}/savepoint-e2e-test-chckpt-dir"

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-stream-stateful-job-upgrade-test/target/StatefulStreamJobUpgradeTestProgram.jar"
Expand Down
11 changes: 0 additions & 11 deletions flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@ TEST_PROGRAM_NAME=DataStreamStateTTLTestProgram
TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar

setup_flink_slf4j_metric_reporter
function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT

# revert our modifications to the Flink distribution
rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
}
trap test_cleanup INT
trap test_cleanup EXIT

set_conf "metrics.fetcher.update-interval" "2000"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ function bucketing_cleanup() {

stop_cluster
$FLINK_DIR/bin/taskmanager.sh stop-all

# restore default logging level
sed -i -e 's/log4j.logger.org.apache.flink=DEBUG/#log4j.logger.org.apache.flink=INFO/g' $FLINK_DIR/conf/log4j.properties
}
trap bucketing_cleanup INT
trap bucketing_cleanup EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ function classloader_cleanup() {

stop_cluster
$FLINK_DIR/bin/taskmanager.sh stop-all

# remove LibPackage.jar again
rm ${FLINK_DIR}/lib/LibPackage.jar
}
trap classloader_cleanup INT
trap classloader_cleanup EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ setup_kafka_dist
start_kafka_cluster

# modify configuration to have enough slots
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml

start_cluster
Expand All @@ -35,9 +34,6 @@ function test_cleanup {
trap "" EXIT

stop_kafka_cluster

# revert our modifications to the Flink distribution
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
}
trap test_cleanup INT
trap test_cleanup EXIT
Expand Down
6 changes: 1 addition & 5 deletions flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ source "$(dirname "$0")"/common.sh
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-stream-sql-test/target/StreamSQLTestProgram.jar

# copy flink-table jar into lib folder
cp $FLINK_DIR/opt/flink-table*jar $FLINK_DIR/lib
add_optional_lib "table"

start_cluster
$FLINK_DIR/bin/taskmanager.sh start
Expand All @@ -39,10 +39,6 @@ function sql_cleanup() {

stop_cluster
$FLINK_DIR/bin/taskmanager.sh stop-all

# remove flink-table from lib folder
rm $FLINK_DIR/lib/flink-table*jar

}
trap sql_cleanup INT
trap sql_cleanup EXIT
Expand Down

0 comments on commit e492d90

Please sign in to comment.