Skip to content

Commit

Permalink
Launch mixed HCV pipeline in Kive, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 3, 2018
1 parent eae08c3 commit 85306c2
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 11 deletions.
12 changes: 12 additions & 0 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
'MiCall resistance on ' + sample_watcher.sample_group.enum,
runbatch=folder_watcher.batch,
groups=ALLOWED_GROUPS)
if pipeline_type == PipelineType.MIXED_HCV:
if self.config.mixed_hcv_pipeline_id is None:
return None
mixed_hcv_pipeline = self.get_kive_pipeline(
self.config.mixed_hcv_pipeline_id)
input_datasets = sample_watcher.fastq_datasets[:2]
return self.session.run_pipeline(
mixed_hcv_pipeline,
input_datasets,
'Mixed HCV on ' + sample_watcher.sample_group.enum,
runbatch=folder_watcher.batch,
groups=ALLOWED_GROUPS)
if pipeline_type == PipelineType.MAIN:
fastq1, fastq2 = sample_watcher.fastq_datasets[:2]
sample_name = sample_watcher.sample_group.names[0]
Expand Down
35 changes: 32 additions & 3 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

ALLOWED_GROUPS = ['Everyone']
# noinspection PyArgumentList
PipelineType = Enum('PipelineType', 'FILTER_QUALITY MAIN MIDI RESISTANCE')
PipelineType = Enum('PipelineType',
'FILTER_QUALITY MAIN MIDI RESISTANCE MIXED_HCV')


class FolderWatcher:
Expand All @@ -16,7 +17,8 @@ def __init__(self,
run folder
:param runner: an object for running Kive pipelines. Must have these
methods:
run_pipeline(folder_watcher, sample_watcher, pipeline_type) => run
run_pipeline(folder_watcher, sample_watcher, pipeline_type)
returns run, or None if that pipeline_type is not configured.
fetch_run_status(run) => True if successfully finished, raise if
run failed
"""
Expand Down Expand Up @@ -70,6 +72,26 @@ def poll_runs(self):
if name is not None)

def poll_sample_runs(self, sample_watcher):
is_mixed_hcv_complete = False
if sample_watcher.mixed_hcv_run is None:
if 'HCV' not in sample_watcher.sample_group.names[0]:
is_mixed_hcv_complete = True
else:
sample_watcher.mixed_hcv_run = self.runner.run_pipeline(
self,
sample_watcher,
PipelineType.MIXED_HCV)
if sample_watcher.mixed_hcv_run is None:
is_mixed_hcv_complete = True
else:
self.active_runs.add(sample_watcher.mixed_hcv_run)
elif sample_watcher.mixed_hcv_run in self.active_runs:
if self.runner.fetch_run_status(sample_watcher.mixed_hcv_run):
is_mixed_hcv_complete = True
self.active_runs.remove(sample_watcher.mixed_hcv_run)
else:
is_mixed_hcv_complete = True

if not sample_watcher.main_runs:
sample_watcher.main_runs.append(self.runner.run_pipeline(
self,
Expand All @@ -94,15 +116,22 @@ def poll_sample_runs(self, sample_watcher):
self,
sample_watcher,
PipelineType.RESISTANCE)
self.active_runs.add(sample_watcher.resistance_run)
# Launched resistance run, nothing more to check on sample.
return False
return self.runner.fetch_run_status(sample_watcher.resistance_run)
if sample_watcher.resistance_run in self.active_runs:
if not self.runner.fetch_run_status(sample_watcher.resistance_run):
# Still running, nothing more to check on sample.
return False
self.active_runs.remove(sample_watcher.resistance_run)
return is_mixed_hcv_complete


class SampleWatcher:
def __init__(self, sample_group):
self.sample_group = sample_group
self.fastq_datasets = []
self.mixed_hcv_run = None
self.main_runs = []
self.resistance_run = None

Expand Down
92 changes: 92 additions & 0 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,98 @@ def test_launch_hcv_resistance_run(raw_data_with_hcv_pair, mock_open_kive, pipel
assert mock_session.run_pipeline.return_value is run


def test_launch_mixed_hcv_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config):
pipelines_config.mixed_hcv_pipeline_id = 47
base_calls = (raw_data_with_hcv_pair /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
quality_csv = Mock(name='quality_csv')
main_fastq1 = Mock(name='main_fastq1')
main_fastq2 = Mock(name='main_fastq2')
midi_fastq1 = Mock(name='midi_fastq1')
midi_fastq2 = Mock(name='midi_fastq2')
mock_session = mock_open_kive.return_value
mock_quality_pipeline = Mock(name='quality_pipeline')
mock_mixed_hcv_pipeline = Mock(name='main_pipeline')
mock_session.get_pipeline.side_effect = [mock_quality_pipeline,
mock_mixed_hcv_pipeline]
mock_session.add_dataset.side_effect = [quality_csv,
main_fastq1,
main_fastq2,
midi_fastq1,
midi_fastq2]
mock_input = Mock(dataset_name='quality_csv')
mock_quality_pipeline.inputs = [mock_input]
mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'),
Mock(dataset_name='FASTQ2')]
kive_watcher = KiveWatcher(pipelines_config)

kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher, = kive_watcher.folder_watchers.values()
sample_watcher, = folder_watcher.sample_watchers

run = kive_watcher.run_pipeline(folder_watcher,
sample_watcher,
PipelineType.MIXED_HCV)

assert [call(pipelines_config.micall_filter_quality_pipeline_id),
call(pipelines_config.mixed_hcv_pipeline_id)
] == mock_session.get_pipeline.call_args_list
mock_session.run_pipeline.assert_called_once_with(
mock_mixed_hcv_pipeline,
[main_fastq1, main_fastq2],
'Mixed HCV on 2130A',
runbatch=mock_session.create_run_batch.return_value,
groups=['Everyone'])
assert mock_session.run_pipeline.return_value is run


def test_launch_mixed_hcv_disabled(raw_data_with_hcv_pair, mock_open_kive, pipelines_config):
pipelines_config.mixed_hcv_pipeline_id = None
base_calls = (raw_data_with_hcv_pair /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
quality_csv = Mock(name='quality_csv')
main_fastq1 = Mock(name='main_fastq1')
main_fastq2 = Mock(name='main_fastq2')
midi_fastq1 = Mock(name='midi_fastq1')
midi_fastq2 = Mock(name='midi_fastq2')
mock_session = mock_open_kive.return_value
mock_quality_pipeline = Mock(name='quality_pipeline')
mock_mixed_hcv_pipeline = Mock(name='main_pipeline')
mock_session.get_pipeline.side_effect = [mock_quality_pipeline,
mock_mixed_hcv_pipeline]
mock_session.add_dataset.side_effect = [quality_csv,
main_fastq1,
main_fastq2,
midi_fastq1,
midi_fastq2]
mock_input = Mock(dataset_name='quality_csv')
mock_quality_pipeline.inputs = [mock_input]
mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'),
Mock(dataset_name='FASTQ2')]
kive_watcher = KiveWatcher(pipelines_config)

kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher, = kive_watcher.folder_watchers.values()
sample_watcher, = folder_watcher.sample_watchers

run = kive_watcher.run_pipeline(folder_watcher,
sample_watcher,
PipelineType.MIXED_HCV)

assert [call(pipelines_config.micall_filter_quality_pipeline_id)
] == mock_session.get_pipeline.call_args_list
mock_session.run_pipeline.assert_not_called()
assert None is run


def test_full_with_two_samples(raw_data_with_two_samples, mock_open_kive, pipelines_config):
pipelines_config.max_active = 2
base_calls = (raw_data_with_two_samples /
Expand Down
149 changes: 141 additions & 8 deletions micall/tests/test_sample_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@


class DummySession:
def __init__(self):
def __init__(self, skipped_types=frozenset()):
""" Initialize.
:param set skipped_types: {PipelineType} types that won't run
"""
self.skipped_types = skipped_types
self.active_runs = []

def run_pipeline(self, *args):
self.active_runs.append(args)
return args
def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
if pipeline_type in self.skipped_types:
return None
run = (folder_watcher, sample_watcher, pipeline_type)
self.active_runs.append(run)
return run

def fetch_run_status(self, run):
return run not in self.active_runs
Expand Down Expand Up @@ -128,6 +136,7 @@ def test_main_finished():
folder_watcher.poll_runs() # Start resistance

assert [(folder_watcher, sample_watcher, PipelineType.RESISTANCE)] == session.active_runs
assert 1 == len(folder_watcher.active_runs)


def test_resistance_running():
Expand Down Expand Up @@ -164,8 +173,9 @@ def test_resistance_finished():
session.active_runs.clear() # Finish resistance
folder_watcher.poll_runs() # Finish sample

assert [] == session.active_runs
assert set() == folder_watcher.active_samples
assert not session.active_runs
assert not folder_watcher.active_samples
assert not folder_watcher.active_runs
assert folder_watcher.is_complete


Expand All @@ -182,7 +192,130 @@ def test_hcv_filter_quality_finished():
folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality

folder_watcher.poll_runs() # start main
folder_watcher.poll_runs() # start main, midi, and mixed HCV

assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV),
(folder_watcher, sample_watcher, PipelineType.MAIN),
(folder_watcher, sample_watcher, PipelineType.MIDI)
] == session.active_runs
assert 3 == len(folder_watcher.active_runs)


def test_hcv_mixed_hcv_running():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
session = DummySession()
folder_watcher = FolderWatcher(base_calls_folder, runner=session)
sample_watcher = SampleWatcher(SampleGroup(
'2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher.sample_watchers.append(sample_watcher)

folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality

folder_watcher.poll_runs() # start main, midi, and mixed HCV
folder_watcher.poll_runs() # main, midi, and mixed HCV still running

assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV),
(folder_watcher, sample_watcher, PipelineType.MAIN),
(folder_watcher, sample_watcher, PipelineType.MIDI)
] == session.active_runs
assert 3 == len(folder_watcher.active_runs)


def test_hcv_mixed_hcv_finished():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
session = DummySession()
folder_watcher = FolderWatcher(base_calls_folder, runner=session)
sample_watcher = SampleWatcher(SampleGroup(
'2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher.sample_watchers.append(sample_watcher)

folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality

folder_watcher.poll_runs() # start main, midi, and mixed HCV
session.active_runs.remove(
(folder_watcher, sample_watcher, PipelineType.MIXED_HCV)) # Finish mixed HCV
folder_watcher.poll_runs() # main, midi, and mixed HCV still running

assert [(folder_watcher, sample_watcher, PipelineType.MAIN),
(folder_watcher, sample_watcher, PipelineType.MIDI)] == session.active_runs
(folder_watcher, sample_watcher, PipelineType.MIDI)
] == session.active_runs
assert 2 == len(folder_watcher.active_runs)


def test_hcv_mixed_hcv_not_finished():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
session = DummySession()
folder_watcher = FolderWatcher(base_calls_folder, runner=session)
sample_watcher = SampleWatcher(SampleGroup(
'2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher.sample_watchers.append(sample_watcher)

folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality

folder_watcher.poll_runs() # start main, midi, and mixed HCV
session.active_runs.remove(
(folder_watcher, sample_watcher, PipelineType.MAIN)) # Finish main
session.active_runs.remove(
(folder_watcher, sample_watcher, PipelineType.MIDI)) # Finish midi
folder_watcher.poll_runs() # mixed HCV still running, resistance started
session.active_runs.remove(
(folder_watcher, sample_watcher, PipelineType.RESISTANCE)) # Finish res
folder_watcher.poll_runs() # mixed HCV still running, resistance finished

assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV)
] == session.active_runs
assert 1 == len(folder_watcher.active_runs)
assert not folder_watcher.is_complete


def test_mixed_hcv_skipped():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
session = DummySession(skipped_types={PipelineType.MIXED_HCV})
folder_watcher = FolderWatcher(base_calls_folder, runner=session)
sample_watcher = SampleWatcher(SampleGroup(
'2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher.sample_watchers.append(sample_watcher)

folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality

folder_watcher.poll_runs() # start main and midi

assert [(folder_watcher, sample_watcher, PipelineType.MAIN),
(folder_watcher, sample_watcher, PipelineType.MIDI)
] == session.active_runs
assert 2 == len(folder_watcher.active_runs)


def test_mixed_hcv_skipped_and_complete():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
session = DummySession(skipped_types={PipelineType.MIXED_HCV})
folder_watcher = FolderWatcher(base_calls_folder, runner=session)
sample_watcher = SampleWatcher(SampleGroup(
'2130A',
('2130A-HCV_S15_L001_R1_001.fastq.gz',
'2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))
folder_watcher.sample_watchers.append(sample_watcher)

folder_watcher.poll_runs() # Start filter_quality
session.active_runs.clear() # Finish filter_quality
folder_watcher.poll_runs() # start main and midi
session.active_runs.clear() # Finish main and midi
folder_watcher.poll_runs() # start resistance
session.active_runs.clear() # Finish resistance
folder_watcher.poll_runs() # done

assert not session.active_runs
assert not folder_watcher.active_runs
assert folder_watcher.is_complete

0 comments on commit 85306c2

Please sign in to comment.