Skip to content

Commit

Permalink
Fix broken tests, and simplify test set up, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 10, 2018
1 parent d7fd61c commit 208ba77
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 361 deletions.
19 changes: 14 additions & 5 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ def add_sample_group(self, base_calls, sample_group):
self.current_folder = base_calls
folder_watcher = self.folder_watchers.get(base_calls)
if folder_watcher is None:
folder_watcher = FolderWatcher(base_calls, self)
self.folder_watchers[base_calls] = folder_watcher
folder_watcher = self.add_folder(base_calls)
self.create_batch(folder_watcher)
self.upload_filter_quality(folder_watcher)
shutil.rmtree(self.get_results_path(folder_watcher),
Expand All @@ -262,6 +261,12 @@ def add_sample_group(self, base_calls, sample_group):
compounddatatype=None)
sample_watcher.fastq_datasets.append(fastq_dataset)
folder_watcher.sample_watchers.append(sample_watcher)
return sample_watcher

def add_folder(self, base_calls):
folder_watcher = FolderWatcher(base_calls, self)
self.folder_watchers[base_calls] = folder_watcher
return folder_watcher

def finish_folder(self):
self.current_folder = None
Expand Down Expand Up @@ -334,7 +339,7 @@ def extract_coverage_maps(self, folder_watcher):
except FileNotFoundError:
pass

def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
def run_pipeline(self, folder_watcher, pipeline_type, sample_watcher):
if pipeline_type == PipelineType.FILTER_QUALITY:
quality_pipeline = self.get_kive_pipeline(
self.config.micall_filter_quality_pipeline_id)
Expand All @@ -347,8 +352,12 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
if pipeline_type == PipelineType.RESISTANCE:
resistance_pipeline = self.get_kive_pipeline(
self.config.micall_resistance_pipeline_id)
main_runs = filter(None,
(sample_watcher.runs.get(pipeline_type)
for pipeline_type in (PipelineType.MAIN,
PipelineType.MIDI)))
input_datasets = [run.get_results()['amino_csv']
for run in sample_watcher.main_runs]
for run in main_runs]
main_aminos_input = self.get_kive_input('main_amino_csv')
for input_dataset in input_datasets:
# TODO: remove this when Kive API sets CDT properly (issue #729)
Expand Down Expand Up @@ -404,7 +413,7 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
runbatch=folder_watcher.batch,
groups=ALLOWED_GROUPS)

def fetch_run_status(self, run, folder_watcher, sample_watcher, pipeline_type):
def fetch_run_status(self, run, folder_watcher, pipeline_type, sample_watcher):
is_complete = run.is_complete()
if is_complete and pipeline_type != PipelineType.FILTER_QUALITY:
sample_name = (sample_watcher.sample_group.names[1]
Expand Down
71 changes: 36 additions & 35 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ def __init__(self,
run folder
:param runner: an object for running Kive pipelines. Must have these
methods:
run_pipeline(folder_watcher, sample_watcher, pipeline_type)
run_pipeline(folder_watcher, pipeline_type, sample_watcher)
returns run, or None if that pipeline_type is not configured.
fetch_run_status(
run,
folder_watcher,
sample_watcher,
pipeline_type) => True if successfully finished, raise if
pipeline_type,
sample_watcher) => True if successfully finished, raise if
run failed, also saves the outputs to temporary files in the
results folder when the run is finished
"""
Expand Down Expand Up @@ -62,7 +62,6 @@ def is_complete(self):
def poll_runs(self):
if self.filter_quality_run is None:
self.filter_quality_run = self.run_pipeline(
None,
PipelineType.FILTER_QUALITY)
# Launched filter run, nothing more to check.
return
Expand All @@ -80,70 +79,74 @@ def poll_runs(self):

def poll_sample_runs(self, sample_watcher):
is_mixed_hcv_complete = False
if not sample_watcher.mixed_hcv_runs:
mixed_hcv_run = sample_watcher.runs.get(PipelineType.MIXED_HCV_MAIN)
if mixed_hcv_run is None:
if 'HCV' not in sample_watcher.sample_group.names[0]:
is_mixed_hcv_complete = True
else:
mixed_hcv_run = self.run_pipeline(
sample_watcher,
PipelineType.MIXED_HCV_MAIN)
PipelineType.MIXED_HCV_MAIN,
sample_watcher)
if mixed_hcv_run is None:
is_mixed_hcv_complete = True
else:
sample_watcher.mixed_hcv_runs.append(mixed_hcv_run)
if sample_watcher.sample_group.names[1] is not None:
mixed_hcv_run = self.run_pipeline(
sample_watcher,
PipelineType.MIXED_HCV_MIDI)
sample_watcher.mixed_hcv_runs.append(mixed_hcv_run)
self.run_pipeline(PipelineType.MIXED_HCV_MIDI,
sample_watcher)
else:
mixed_hcv_midi_run = sample_watcher.runs.get(PipelineType.MIXED_HCV_MIDI)
active_mixed_runs = [
run
for run in sample_watcher.mixed_hcv_runs
for run in (mixed_hcv_run, mixed_hcv_midi_run)
if run in self.active_runs and not self.fetch_run_status(run)]
is_mixed_hcv_complete = not active_mixed_runs

if not sample_watcher.main_runs:
sample_watcher.main_runs.append(self.run_pipeline(
sample_watcher,
PipelineType.MAIN))
main_run = sample_watcher.runs.get(PipelineType.MAIN)
if main_run is None:
self.run_pipeline(PipelineType.MAIN, sample_watcher)
if sample_watcher.sample_group.names[1] is not None:
sample_watcher.main_runs.append(self.run_pipeline(
sample_watcher,
PipelineType.MIDI))
self.run_pipeline(PipelineType.MIDI, sample_watcher)
# Launched main and midi runs, nothing more to check on sample.
return False
midi_run = sample_watcher.runs.get(PipelineType.MIDI)
active_main_runs = [
run
for run in sample_watcher.main_runs
for run in (main_run, midi_run)
if run in self.active_runs and not self.fetch_run_status(run)]
if active_main_runs:
# Still running, nothing more to check on sample
return False
if sample_watcher.resistance_run is None:
sample_watcher.resistance_run = self.run_pipeline(
sample_watcher,
PipelineType.RESISTANCE)
resistance_run = sample_watcher.runs.get(PipelineType.RESISTANCE)
if resistance_run is None:
self.run_pipeline(PipelineType.RESISTANCE, sample_watcher)
# Launched resistance run, nothing more to check on sample.
return False
if sample_watcher.resistance_run in self.active_runs:
if not self.fetch_run_status(sample_watcher.resistance_run):
if resistance_run in self.active_runs:
if not self.fetch_run_status(resistance_run):
# Still running, nothing more to check on sample.
return False
return is_mixed_hcv_complete

def run_pipeline(self, sample_watcher, pipeline_type):
run = self.runner.run_pipeline(self, sample_watcher, pipeline_type)
def run_pipeline(self, pipeline_type, sample_watcher=None):
run = self.runner.run_pipeline(self, pipeline_type, sample_watcher)
if run is not None:
self.active_runs[run] = (sample_watcher, pipeline_type)
self.add_run(run, pipeline_type, sample_watcher)
return run

def add_run(self, run, pipeline_type, sample_watcher=None, is_complete=False):
if not is_complete:
self.active_runs[run] = (sample_watcher, pipeline_type)
if pipeline_type == PipelineType.FILTER_QUALITY:
self.filter_quality_run = run
else:
sample_watcher.runs[pipeline_type] = run

def fetch_run_status(self, run):
sample_watcher, pipeline_type = self.active_runs[run]
is_complete = self.runner.fetch_run_status(run,
self,
sample_watcher,
pipeline_type)
pipeline_type,
sample_watcher)
if is_complete:
del self.active_runs[run]
return is_complete
Expand All @@ -153,9 +156,7 @@ class SampleWatcher:
def __init__(self, sample_group):
self.sample_group = sample_group
self.fastq_datasets = []
self.mixed_hcv_runs = []
self.main_runs = []
self.resistance_run = None
self.runs = {} # {pipeline_type: run}

def __repr__(self):
enum = self.sample_group.enum
Expand Down
Loading

0 comments on commit 208ba77

Please sign in to comment.