Skip to content

Commit

Permalink
Fix multiple workload runs
Browse files Browse the repository at this point in the history
Change reporting so that multiple workloads in one job execution can
be reported instead of overwriting the previous value.

Change the daily job to use a single, multiple workload run.

Change-Id: I8e350350ae13d2272b584af7a60ad269de160587
JIRA: STORPERF-98
Signed-off-by: mbeierl <mark.beierl@dell.com>
  • Loading branch information
mbeierl committed Jan 20, 2017
1 parent 3777338 commit 29cab6c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 59 deletions.
36 changes: 13 additions & 23 deletions ci/daily.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,32 +96,22 @@ do
| awk '/Status/ {print $2}' | sed 's/"//g'`
done

for WORKLOAD in ws wr rs rr rw
export QUEUE_DEPTH=1,2,8
export BLOCK_SIZE=2048,8192,16384
export WORKLOAD=ws,wr,rs,rr,rw
export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"

JOB=`$WORKSPACE/ci/start_job.sh \
| awk '/job_id/ {print $2}' | sed 's/"//g'`
JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
| awk '/Status/ {print $2}' | sed 's/"//g'`
while [ "$JOB_STATUS" != "Completed" ]
do
for BLOCK_SIZE in 2048 8192 16384
do
for QUEUE_DEPTH in 1 2 8
do
export QUEUE_DEPTH
export BLOCK_SIZE
export WORKLOAD
export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"

JOB=`$WORKSPACE/ci/start_job.sh \
| awk '/job_id/ {print $2}' | sed 's/"//g'`
JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
| awk '/Status/ {print $2}' | sed 's/"//g'`
while [ "$JOB_STATUS" != "Completed" ]
do
sleep 60
JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
| awk '/Status/ {print $2}' | sed 's/"//g'`
done
done
done
sleep 60
JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
| awk '/Status/ {print $2}' | sed 's/"//g'`
done


echo "Deleting stack for cleanup"
curl -X DELETE --header 'Accept: application/json' 'http://127.0.0.1:5000/api/v1.0/configurations'

Expand Down
6 changes: 3 additions & 3 deletions storperf/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ def execute(self, metadata):
def terminate(self):
self._terminated = True
self.end_time = time.time()
return self._terminate_current_run()
return self.terminate_current_run()

def _terminate_current_run(self):
def terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
Expand Down Expand Up @@ -243,7 +243,7 @@ def execute_workloads(self):
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
self._terminate_current_run,
self.terminate_current_run,
())
t = Thread(target=scheduler.run, args=())
t.start()
Expand Down
19 changes: 14 additions & 5 deletions storperf/utilities/data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from storperf.utilities import steady_state as SteadyState
from time import sleep
import time
import json


class DataHandler(object):
Expand Down Expand Up @@ -61,13 +62,21 @@ def data_event(self, executor):
if not steady:
steady_state = False

executor.metadata['report_data'] = metrics
executor.metadata['steady_state'] = steady_state
workload = '.'.join(executor.current_workload.split('.')[1:6])

if 'report_data' not in executor.metadata:
executor.metadata['report_data'] = {}

if 'steady_state' not in executor.metadata:
executor.metadata['steady_state'] = {}

executor.metadata['report_data'][workload] = metrics
executor.metadata['steady_state'][workload] = steady_state

workload_name = executor.current_workload.split('.')[1]

if steady_state and not workload_name.startswith('_'):
executor.terminate()
executor.terminate_current_run()

def _lookup_prior_data(self, executor, metric, io_type):
workload = executor.current_workload
Expand Down Expand Up @@ -112,7 +121,7 @@ def _evaluate_prior_data(self, data_series):
duration = latest_timestamp - earliest_timestamp
if (duration < 60 * self.samples):
self.logger.debug("Only %s minutes of samples, ignoring" %
(duration / 60,))
((duration / 60 + 1),))
return False

return SteadyState.steady_state(data_series)
Expand Down Expand Up @@ -160,6 +169,6 @@ def _push_to_db(self, executor):
scenario,
criteria,
build_tag,
payload)
json.dumps(payload))
except:
self.logger.exception("Error pushing results into Database")
60 changes: 32 additions & 28 deletions storperf/utilities/thread_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
import logging
import time
from threading import Lock


class FailureToReportException(Exception):
Expand All @@ -26,38 +27,41 @@ def __init__(self, size, timeout=60):
self._timeout = timeout
self._registrants = {}
self._creation_time = time.time()
self._lock = Lock()

"""
Calling this method returns a true or false, indicating that enough
of the other registrants have reported in
"""

def report(self, gate_id):
now = time.time()
self._registrants[gate_id] = now
ready = True
self.logger.debug("Gate report for %s", gate_id)

total_missing = self._gate_size - len(self._registrants)
if total_missing > 0:
self.logger.debug("Not all registrants have reported in")
time_since_creation = now - self._creation_time
if (time_since_creation > (self._timeout * 2)):
self.logger.error("%s registrant(s) have never reported in",
total_missing)
raise FailureToReportException
return False

for k, v in self._registrants.items():
time_since_last_report = now - v
if time_since_last_report > self._timeout:
self.logger.debug("Registrant %s last reported %s ago",
k, time_since_last_report)
ready = False

self.logger.debug("Gate pass? %s", ready)

if ready:
self._registrants.clear()

return ready
with self._lock:
now = time.time()
self._registrants[gate_id] = now
ready = True
self.logger.debug("Gate report for %s", gate_id)

total_missing = self._gate_size - len(self._registrants)
if total_missing > 0:
self.logger.debug("Not all registrants have reported in")
time_since_creation = now - self._creation_time
if (time_since_creation > (self._timeout * 2)):
self.logger.error(
"%s registrant(s) have never reported in",
total_missing)
raise FailureToReportException
return False

for k, v in self._registrants.items():
time_since_last_report = now - v
if time_since_last_report > self._timeout:
self.logger.debug("Registrant %s last reported %s ago",
k, time_since_last_report)
ready = False

self.logger.debug("Gate pass? %s", ready)

if ready:
self._registrants.clear()

return ready
11 changes: 11 additions & 0 deletions tests/utilities_tests/data_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def push_results_to_db(self, *args):
def terminate(self):
self._terminated = True

def terminate_current_run(self):
self._terminated = True

@mock.patch("time.time")
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
Expand Down Expand Up @@ -163,18 +166,22 @@ def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
self.assertEqual(False, self._terminated)

self.assertEqual(expected_slope, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['slope'])
self.assertEqual(expected_range, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['range'])
self.assertEqual(expected_average, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['average'])
self.assertEqual(series, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['series'])
Expand Down Expand Up @@ -211,18 +218,22 @@ def test_report_that_causes_termination(self,
self.data_handler.data_event(self)

self.assertEqual(expected_slope, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['slope'])
self.assertEqual(expected_range, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['range'])
self.assertEqual(expected_average, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['average'])
self.assertEqual(series, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']
['series'])
Expand Down

0 comments on commit 29cab6c

Please sign in to comment.