Skip to content

Commit

Permalink
Refresh Kive login and mark failed runs, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 11, 2018
1 parent a28a827 commit b6360b6
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 29 deletions.
66 changes: 46 additions & 20 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,13 @@
from io import StringIO, BytesIO
from time import sleep

from kiveapi import KiveAPI, KiveClientException

from micall.drivers.run_info import parse_read_sizes
from micall.monitor import error_metrics_parser
from micall.monitor.sample_watcher import FolderWatcher, ALLOWED_GROUPS, SampleWatcher, PipelineType
from micall.resistance.resistance import find_groups

try:
from kiveapi import KiveAPI
from kiveapi.runstatus import RunStatus
except ImportError:
# Ignore import errors during testing.
KiveAPI = RunStatus = None

try:
from requests.adapters import HTTPAdapter
except ImportError:
Expand Down Expand Up @@ -130,7 +125,6 @@ def send_event(sample_queue, folder_event, next_scan):
try:
sample_queue.put(folder_event,
timeout=SLEEP_SECONDS)
logger.debug('Put %s in queue.', folder_event)
is_sent = True
except Full:
pass
Expand Down Expand Up @@ -176,7 +170,8 @@ def get_kive_pipeline(self, pipeline_id):
self.check_session()
kive_pipeline = self.pipelines.get(pipeline_id)
if kive_pipeline is None:
kive_pipeline = self.session.get_pipeline(pipeline_id)
kive_pipeline = self.kive_retry(
lambda: self.session.get_pipeline(pipeline_id))
self.pipelines[pipeline_id] = kive_pipeline
return kive_pipeline

Expand All @@ -195,10 +190,11 @@ def create_batch(self, folder_watcher):
description = 'MiCall batch for folder {}, pipeline version {}.'.format(
folder_watcher.run_name,
self.config.pipeline_version)
batch = self.session.create_run_batch(batch_name,
description=description,
users=[],
groups=ALLOWED_GROUPS)
batch = self.kive_retry(
lambda: self.session.create_run_batch(batch_name,
description=description,
users=[],
groups=ALLOWED_GROUPS))
folder_watcher.batch = batch

def find_kive_dataset(self, source_file, dataset_name, cdt):
Expand All @@ -214,10 +210,11 @@ def find_kive_dataset(self, source_file, dataset_name, cdt):
for chunk in iter(lambda: source_file.read(chunk_size), b""):
digest.update(chunk)
checksum = digest.hexdigest()
datasets = self.session.find_datasets(name=dataset_name,
md5=checksum,
cdt=cdt,
uploaded=True)
datasets = self.kive_retry(
lambda: self.session.find_datasets(name=dataset_name,
md5=checksum,
cdt=cdt,
uploaded=True))
needed_groups = set(ALLOWED_GROUPS)
for dataset in datasets:
missing_groups = needed_groups - set(dataset.groups_allowed)
Expand Down Expand Up @@ -323,14 +320,31 @@ def poll_runs(self):
for folder in completed_folders:
folder_watcher = self.folder_watchers.pop(folder)
results_path = self.collate_folder(folder_watcher)
if results_path is None:
continue
if (results_path / "coverage_scores.csv").exists():
self.result_handler(results_path)
(results_path / "doneprocessing").touch()
if not self.folder_watchers:
logger.info('No more folders to process.')

def collate_folder(self, folder_watcher):
""" Collate scratch files for a run folder.
:param FolderWatcher folder_watcher: holds details about the run folder
"""
results_path = self.get_results_path(folder_watcher)
failed_sample_names = [
sample_watcher.sample_group.enum
for sample_watcher in folder_watcher.sample_watchers
if sample_watcher.is_failed]
if failed_sample_names:
run_path = (results_path / "../..").resolve()
error_message = 'Samples failed in Kive: {}.'.format(
', '.join(failed_sample_names))
(run_path / 'errorprocessing').write_text(error_message + '\n')
logger.error('Error in folder %s: %s', run_path, error_message)
return
logger.info('Collating results in %s', results_path)
scratch_path = results_path / "scratch"
for output_name in DOWNLOADED_RESULTS:
Expand Down Expand Up @@ -456,8 +470,20 @@ def run_pipeline(self, folder_watcher, pipeline_type, sample_watcher):
runbatch=folder_watcher.batch,
groups=ALLOWED_GROUPS)

def kive_retry(self, target):
""" Add a single retry to a Kive API call.
Tries to call the target function, then refreshes the session login if
the call fails and tries a second time.
"""
try:
return target()
except KiveClientException:
self.session.login(self.config.kive_user, self.config.kive_password)
return target()

def fetch_run_status(self, run, folder_watcher, pipeline_type, sample_watcher):
is_complete = run.is_complete()
is_complete = self.kive_retry(run.is_complete)
if is_complete and pipeline_type != PipelineType.FILTER_QUALITY:
sample_name = (sample_watcher.sample_group.names[1]
if pipeline_type in (PipelineType.MIDI,
Expand All @@ -466,14 +492,14 @@ def fetch_run_status(self, run, folder_watcher, pipeline_type, sample_watcher):
results_path = self.get_results_path(folder_watcher)
scratch_path = results_path / "scratch" / trim_name(sample_name)
scratch_path.mkdir(parents=True, exist_ok=True)
results = run.get_results()
results = self.kive_retry(run.get_results)
for output_name in DOWNLOADED_RESULTS:
dataset = results.get(output_name)
if dataset is None:
continue
filename = get_output_filename(output_name)
with (scratch_path / filename).open('wb') as f:
dataset.download(f)
self.kive_retry(lambda: dataset.download(f))

return is_complete

Expand Down
36 changes: 27 additions & 9 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from enum import Enum
from pathlib import Path

from kiveapi import KiveRunFailedException

ALLOWED_GROUPS = ['Everyone']
# noinspection PyArgumentList
PipelineType = Enum(
Expand Down Expand Up @@ -78,6 +80,14 @@ def poll_runs(self):
if name is not None)

def poll_sample_runs(self, sample_watcher):
""" Poll all active runs for a sample.
:param sample_watcher: details about the sample to poll
:return: True if the sample is complete (success or failure), otherwise
False
"""
if sample_watcher.is_failed:
return True
is_mixed_hcv_complete = False
mixed_hcv_run = sample_watcher.runs.get(PipelineType.MIXED_HCV_MAIN)
if mixed_hcv_run is None:
Expand Down Expand Up @@ -107,27 +117,30 @@ def poll_sample_runs(self, sample_watcher):
if sample_watcher.sample_group.names[1] is not None:
self.run_pipeline(PipelineType.MIDI, sample_watcher)
# Launched main and midi runs, nothing more to check on sample.
return False
return sample_watcher.is_failed
midi_run = sample_watcher.runs.get(PipelineType.MIDI)
active_main_runs = [
run
for run in (main_run, midi_run)
if run in self.active_runs and not self.fetch_run_status(run)]
if active_main_runs:
# Still running, nothing more to check on sample
return False
return sample_watcher.is_failed
resistance_run = sample_watcher.runs.get(PipelineType.RESISTANCE)
if resistance_run is None:
self.run_pipeline(PipelineType.RESISTANCE, sample_watcher)
# Launched resistance run, nothing more to check on sample.
return False
return sample_watcher.is_failed
if resistance_run in self.active_runs:
if not self.fetch_run_status(resistance_run):
# Still running, nothing more to check on sample.
return False
return is_mixed_hcv_complete
return sample_watcher.is_failed
return is_mixed_hcv_complete or sample_watcher.is_failed

def run_pipeline(self, pipeline_type, sample_watcher=None):
if sample_watcher and sample_watcher.is_failed:
# Don't start runs when the sample has already failed.
return None
run = self.runner.run_pipeline(self, pipeline_type, sample_watcher)
if run is not None:
self.add_run(run, pipeline_type, sample_watcher)
Expand All @@ -143,10 +156,14 @@ def add_run(self, run, pipeline_type, sample_watcher=None, is_complete=False):

def fetch_run_status(self, run):
sample_watcher, pipeline_type = self.active_runs[run]
is_complete = self.runner.fetch_run_status(run,
self,
pipeline_type,
sample_watcher)
try:
is_complete = self.runner.fetch_run_status(run,
self,
pipeline_type,
sample_watcher)
except KiveRunFailedException:
sample_watcher.is_failed = True
is_complete = True
if is_complete:
del self.active_runs[run]
return is_complete
Expand All @@ -157,6 +174,7 @@ def __init__(self, sample_group):
self.sample_group = sample_group
self.fastq_datasets = []
self.runs = {} # {pipeline_type: run}
self.is_failed = False

def __repr__(self):
enum = self.sample_group.enum
Expand Down
98 changes: 98 additions & 0 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from struct import pack

from kiveapi import KiveClientException, KiveRunFailedException

from micall.monitor.kive_watcher import find_samples, KiveWatcher, FolderEvent, FolderEventType
from micall.monitor.sample_watcher import PipelineType, ALLOWED_GROUPS, FolderWatcher, SampleWatcher
from micall.resistance.resistance import SampleGroup
Expand Down Expand Up @@ -435,6 +437,27 @@ def test_add_first_sample(raw_data_with_two_samples, mock_open_kive, default_con
assert not old_stuff_csv.exists()


def test_create_batch_with_expired_session(raw_data_with_two_samples,
mock_open_kive,
default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
mock_session = mock_open_kive.return_value
mock_batch = Mock(name='batch')
mock_session.create_run_batch.side_effect = [KiveClientException('expired'),
mock_batch]
kive_watcher = KiveWatcher(default_config)

kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2110A',
('2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
None)))

folder_watcher, = kive_watcher.folder_watchers.values()
assert mock_batch is folder_watcher.batch


def test_add_external_dataset(raw_data_with_two_samples, mock_open_kive, default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
Expand Down Expand Up @@ -1065,6 +1088,31 @@ def test_fetch_run_status_main_and_midi(raw_data_with_hcv_pair, pipelines_config
assert expected_midi_nuc_path.exists()


def test_fetch_run_status_session_expired(raw_data_with_two_runs,
mock_open_kive,
pipelines_config):
assert mock_open_kive
base_calls = (raw_data_with_two_runs /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
mock_run = Mock(**{'is_complete.side_effect': [KiveClientException('expired'),
True],
'get_results.return_value': {}})

kive_watcher = KiveWatcher(pipelines_config)

sample_watcher = kive_watcher.add_sample_group(
base_calls,
SampleGroup('2000A', ('2000A-V3LOOP_S2_L001_R1_001.fastq.gz', None)))
folder_watcher, = kive_watcher.folder_watchers.values()

is_complete = kive_watcher.fetch_run_status(mock_run,
folder_watcher,
PipelineType.MAIN,
sample_watcher)

assert is_complete


def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_config):
assert mock_open_kive
base_calls = (raw_data_with_two_samples /
Expand Down Expand Up @@ -1238,6 +1286,56 @@ def test_folder_not_finished_before_new_start(raw_data_with_two_runs,
assert scratch_path.exists()


def test_folder_failed(raw_data_with_two_samples, mock_open_kive, default_config):
assert mock_open_kive
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
main_run1 = Mock(
name='main_run1',
**{'is_complete.side_effect': KiveRunFailedException('failed')})
resistance_run2 = Mock(
name='resistance_run2',
**{'is_complete.return_value': True,
'get_results.return_value': create_datasets(['resistance_csv'])})
kive_watcher = KiveWatcher(default_config)

folder_watcher = kive_watcher.add_folder(base_calls)
sample1_watcher = kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2110A',
('2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
None)))
sample2_watcher = kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2120A',
('2120A-PR_S14_L001_R1_001.fastq.gz',
None)))
kive_watcher.finish_folder(base_calls)
folder_watcher.add_run(Mock(name='filter_quality_run'),
PipelineType.FILTER_QUALITY,
is_complete=True)
folder_watcher.add_run(main_run1,
PipelineType.MAIN,
sample1_watcher)
folder_watcher.add_run(Mock(name='main_run2'),
PipelineType.MAIN,
sample2_watcher,
is_complete=True)
folder_watcher.add_run(resistance_run2,
PipelineType.RESISTANCE,
sample2_watcher)
run_path = base_calls / "../../.."
results_path = run_path / "Results/version_0-dev"
expected_done_path = results_path / "doneprocessing"
expected_error_path = run_path / "errorprocessing"
expected_error_message = "Samples failed in Kive: 2110A.\n"

kive_watcher.poll_runs()

assert not expected_done_path.exists()
assert expected_error_message == expected_error_path.read_text()


def test_add_duplicate_sample(raw_data_with_two_samples,
mock_open_kive,
default_config):
Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Requirements for running the tests

-r requirements.txt
-r requirements-monitor.txt
pytest==3.5.0
coverage==4.3.4
pandas==0.21.0
Expand Down

0 comments on commit b6360b6

Please sign in to comment.