From 29cab6cd9d6e669c74a1dd6960aba8250f539c2f Mon Sep 17 00:00:00 2001 From: mbeierl Date: Fri, 20 Jan 2017 16:05:24 -0500 Subject: [PATCH] Fix multiple workload runs Change reporting so that multiple workloads in one job execution can be reported instead of overwriting the previous value. Change the daily job to use a single, multiple workload run. Change-Id: I8e350350ae13d2272b584af7a60ad269de160587 JIRA: STORPERF-98 Signed-off-by: mbeierl --- ci/daily.sh | 36 +++++-------- storperf/test_executor.py | 6 +-- storperf/utilities/data_handler.py | 19 +++++-- storperf/utilities/thread_gate.py | 60 ++++++++++++---------- tests/utilities_tests/data_handler_test.py | 11 ++++ 5 files changed, 73 insertions(+), 59 deletions(-) diff --git a/ci/daily.sh b/ci/daily.sh index e3b64cc..1e77d67 100755 --- a/ci/daily.sh +++ b/ci/daily.sh @@ -96,32 +96,22 @@ do | awk '/Status/ {print $2}' | sed 's/"//g'` done -for WORKLOAD in ws wr rs rr rw +export QUEUE_DEPTH=1,2,8 +export BLOCK_SIZE=2048,8192,16384 +export WORKLOAD=ws,wr,rs,rr,rw +export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}" + +JOB=`$WORKSPACE/ci/start_job.sh \ + | awk '/job_id/ {print $2}' | sed 's/"//g'` +JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \ + | awk '/Status/ {print $2}' | sed 's/"//g'` +while [ "$JOB_STATUS" != "Completed" ] do - for BLOCK_SIZE in 2048 8192 16384 - do - for QUEUE_DEPTH in 1 2 8 - do - export QUEUE_DEPTH - export BLOCK_SIZE - export WORKLOAD - export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}" - - JOB=`$WORKSPACE/ci/start_job.sh \ - | awk '/job_id/ {print $2}' | sed 's/"//g'` - JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \ - | awk '/Status/ {print $2}' | sed 's/"//g'` - while [ "$JOB_STATUS" != "Completed" ] - do - sleep 60 - JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \ - | awk '/Status/ {print $2}' | sed 's/"//g'` - done - done - done + sleep 60 + JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \ + | awk '/Status/ {print $2}' | sed 's/"//g'` done - echo "Deleting stack for cleanup" curl -X DELETE --header 'Accept: application/json' 'http://127.0.0.1:5000/api/v1.0/configurations' diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 9c9393f..cda6c78 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -177,9 +177,9 @@ def execute(self, metadata): def terminate(self): self._terminated = True self.end_time = time.time() - return self._terminate_current_run() + return self.terminate_current_run() - def _terminate_current_run(self): + def terminate_current_run(self): self.logger.info("Terminating current run") terminated_hosts = [] for workload in self._workload_executors: @@ -243,7 +243,7 @@ def execute_workloads(self): if self.deadline is not None \ and not workload_name.startswith("_"): event = scheduler.enter(self.deadline * 60, 1, - self._terminate_current_run, + self.terminate_current_run, ()) t = Thread(target=scheduler.run, args=()) t.start() diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index ebc1bfd..0aae3b1 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -17,6 +17,7 @@ from storperf.utilities import steady_state as SteadyState from time import sleep import time +import json class DataHandler(object): @@ -61,13 +62,21 @@ def data_event(self, executor): if not steady: steady_state = False - executor.metadata['report_data'] = metrics - executor.metadata['steady_state'] = steady_state + workload = '.'.join(executor.current_workload.split('.')[1:6]) + + if 'report_data' not in executor.metadata: + executor.metadata['report_data'] = {} + + if 'steady_state' not in executor.metadata: + executor.metadata['steady_state'] = {} + + executor.metadata['report_data'][workload] = metrics + executor.metadata['steady_state'][workload] = steady_state workload_name = executor.current_workload.split('.')[1] if steady_state and not workload_name.startswith('_'): - executor.terminate() + executor.terminate_current_run() def _lookup_prior_data(self, executor, metric, io_type): workload = executor.current_workload @@ -112,7 +121,7 @@ def _evaluate_prior_data(self, data_series): duration = latest_timestamp - earliest_timestamp if (duration < 60 * self.samples): self.logger.debug("Only %s minutes of samples, ignoring" % - (duration / 60,)) + ((duration / 60 + 1),)) return False return SteadyState.steady_state(data_series) @@ -160,6 +169,6 @@ def _push_to_db(self, executor): scenario, criteria, build_tag, - payload) + json.dumps(payload)) except: self.logger.exception("Error pushing results into Database") diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py index 295b8be..38acbb1 100644 --- a/storperf/utilities/thread_gate.py +++ b/storperf/utilities/thread_gate.py @@ -12,6 +12,7 @@ """ import logging import time +from threading import Lock class FailureToReportException(Exception): @@ -26,6 +27,7 @@ def __init__(self, size, timeout=60): self._timeout = timeout self._registrants = {} self._creation_time = time.time() + self._lock = Lock() """ Calling this method returns a true or false, indicating that enough @@ -33,31 +35,33 @@ def __init__(self, size, timeout=60): """ def report(self, gate_id): - now = time.time() - self._registrants[gate_id] = now - ready = True - self.logger.debug("Gate report for %s", gate_id) - - total_missing = self._gate_size - len(self._registrants) - if total_missing > 0: - self.logger.debug("Not all registrants have reported in") - time_since_creation = now - self._creation_time - if (time_since_creation > (self._timeout * 2)): - self.logger.error("%s registrant(s) have never reported in", - total_missing) - raise FailureToReportException - return False - - for k, v in self._registrants.items(): - time_since_last_report = now - v - if time_since_last_report > self._timeout: - self.logger.debug("Registrant %s last reported %s ago", - k, time_since_last_report) - ready = False - - self.logger.debug("Gate pass? %s", ready) - - if ready: - self._registrants.clear() - - return ready + with self._lock: + now = time.time() + self._registrants[gate_id] = now + ready = True + self.logger.debug("Gate report for %s", gate_id) + + total_missing = self._gate_size - len(self._registrants) + if total_missing > 0: + self.logger.debug("Not all registrants have reported in") + time_since_creation = now - self._creation_time + if (time_since_creation > (self._timeout * 2)): + self.logger.error( + "%s registrant(s) have never reported in", + total_missing) + raise FailureToReportException + return False + + for k, v in self._registrants.items(): + time_since_last_report = now - v + if time_since_last_report > self._timeout: + self.logger.debug("Registrant %s last reported %s ago", + k, time_since_last_report) + ready = False + + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + + return ready diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py index b175c87..8115c6d 100644 --- a/tests/utilities_tests/data_handler_test.py +++ b/tests/utilities_tests/data_handler_test.py @@ -57,6 +57,9 @@ def push_results_to_db(self, *args): def terminate(self): self._terminated = True + def terminate_current_run(self): + self._terminated = True + @mock.patch("time.time") @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") @@ -163,18 +166,22 @@ def test_non_terminated_report(self, mock_graphite_db, mock_results_db, self.assertEqual(False, self._terminated) self.assertEqual(expected_slope, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['slope']) self.assertEqual(expected_range, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['range']) self.assertEqual(expected_average, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['average']) self.assertEqual(series, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['series']) @@ -211,18 +218,22 @@ def test_report_that_causes_termination(self, self.data_handler.data_event(self) self.assertEqual(expected_slope, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['slope']) self.assertEqual(expected_range, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['range']) self.assertEqual(expected_average, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['average']) self.assertEqual(series, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] ['lat.mean'] ['read'] ['series'])