From 85306c217787e5760b6b7e76be16ba02f16b6246 Mon Sep 17 00:00:00 2001 From: don Date: Tue, 3 Apr 2018 16:26:44 -0700 Subject: [PATCH] Launch mixed HCV pipeline in Kive, as part of #438. --- micall/monitor/kive_watcher.py | 12 +++ micall/monitor/sample_watcher.py | 35 ++++++- micall/tests/test_kive_watcher.py | 92 +++++++++++++++++ micall/tests/test_sample_watcher.py | 149 ++++++++++++++++++++++++++-- 4 files changed, 277 insertions(+), 11 deletions(-) diff --git a/micall/monitor/kive_watcher.py b/micall/monitor/kive_watcher.py index 6f010412c..c6a46126b 100644 --- a/micall/monitor/kive_watcher.py +++ b/micall/monitor/kive_watcher.py @@ -233,6 +233,18 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): 'MiCall resistance on ' + sample_watcher.sample_group.enum, runbatch=folder_watcher.batch, groups=ALLOWED_GROUPS) + if pipeline_type == PipelineType.MIXED_HCV: + if self.config.mixed_hcv_pipeline_id is None: + return None + mixed_hcv_pipeline = self.get_kive_pipeline( + self.config.mixed_hcv_pipeline_id) + input_datasets = sample_watcher.fastq_datasets[:2] + return self.session.run_pipeline( + mixed_hcv_pipeline, + input_datasets, + 'Mixed HCV on ' + sample_watcher.sample_group.enum, + runbatch=folder_watcher.batch, + groups=ALLOWED_GROUPS) if pipeline_type == PipelineType.MAIN: fastq1, fastq2 = sample_watcher.fastq_datasets[:2] sample_name = sample_watcher.sample_group.names[0] diff --git a/micall/monitor/sample_watcher.py b/micall/monitor/sample_watcher.py index 800286aae..ef8570056 100644 --- a/micall/monitor/sample_watcher.py +++ b/micall/monitor/sample_watcher.py @@ -3,7 +3,8 @@ ALLOWED_GROUPS = ['Everyone'] # noinspection PyArgumentList -PipelineType = Enum('PipelineType', 'FILTER_QUALITY MAIN MIDI RESISTANCE') +PipelineType = Enum('PipelineType', + 'FILTER_QUALITY MAIN MIDI RESISTANCE MIXED_HCV') class FolderWatcher: @@ -16,7 +17,8 @@ def __init__(self, run folder :param runner: an object for running Kive pipelines. Must have these methods: - run_pipeline(folder_watcher, sample_watcher, pipeline_type) => run + run_pipeline(folder_watcher, sample_watcher, pipeline_type) + returns run, or None if that pipeline_type is not configured. fetch_run_status(run) => True if successfully finished, raise if run failed """ @@ -70,6 +72,26 @@ def poll_runs(self): if name is not None) def poll_sample_runs(self, sample_watcher): + is_mixed_hcv_complete = False + if sample_watcher.mixed_hcv_run is None: + if 'HCV' not in sample_watcher.sample_group.names[0]: + is_mixed_hcv_complete = True + else: + sample_watcher.mixed_hcv_run = self.runner.run_pipeline( + self, + sample_watcher, + PipelineType.MIXED_HCV) + if sample_watcher.mixed_hcv_run is None: + is_mixed_hcv_complete = True + else: + self.active_runs.add(sample_watcher.mixed_hcv_run) + elif sample_watcher.mixed_hcv_run in self.active_runs: + if self.runner.fetch_run_status(sample_watcher.mixed_hcv_run): + is_mixed_hcv_complete = True + self.active_runs.remove(sample_watcher.mixed_hcv_run) + else: + is_mixed_hcv_complete = True + if not sample_watcher.main_runs: sample_watcher.main_runs.append(self.runner.run_pipeline( self, @@ -94,15 +116,22 @@ def poll_sample_runs(self, sample_watcher): self, sample_watcher, PipelineType.RESISTANCE) + self.active_runs.add(sample_watcher.resistance_run) # Launched resistance run, nothing more to check on sample. return False - return self.runner.fetch_run_status(sample_watcher.resistance_run) + if sample_watcher.resistance_run in self.active_runs: + if not self.runner.fetch_run_status(sample_watcher.resistance_run): + # Still running, nothing more to check on sample. + return False + self.active_runs.remove(sample_watcher.resistance_run) + return is_mixed_hcv_complete class SampleWatcher: def __init__(self, sample_group): self.sample_group = sample_group self.fastq_datasets = [] + self.mixed_hcv_run = None self.main_runs = [] self.resistance_run = None diff --git a/micall/tests/test_kive_watcher.py b/micall/tests/test_kive_watcher.py index 75ebc668e..98302b50a 100644 --- a/micall/tests/test_kive_watcher.py +++ b/micall/tests/test_kive_watcher.py @@ -615,6 +615,98 @@ def test_launch_hcv_resistance_run(raw_data_with_hcv_pair, mock_open_kive, pipel assert mock_session.run_pipeline.return_value is run +def test_launch_mixed_hcv_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): + pipelines_config.mixed_hcv_pipeline_id = 47 + base_calls = (raw_data_with_hcv_pair / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + quality_csv = Mock(name='quality_csv') + main_fastq1 = Mock(name='main_fastq1') + main_fastq2 = Mock(name='main_fastq2') + midi_fastq1 = Mock(name='midi_fastq1') + midi_fastq2 = Mock(name='midi_fastq2') + mock_session = mock_open_kive.return_value + mock_quality_pipeline = Mock(name='quality_pipeline') + mock_mixed_hcv_pipeline = Mock(name='main_pipeline') + mock_session.get_pipeline.side_effect = [mock_quality_pipeline, + mock_mixed_hcv_pipeline] + mock_session.add_dataset.side_effect = [quality_csv, + main_fastq1, + main_fastq2, + midi_fastq1, + midi_fastq2] + mock_input = Mock(dataset_name='quality_csv') + mock_quality_pipeline.inputs = [mock_input] + mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'), + Mock(dataset_name='FASTQ2')] + kive_watcher = KiveWatcher(pipelines_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher, = kive_watcher.folder_watchers.values() + sample_watcher, = folder_watcher.sample_watchers + + run = kive_watcher.run_pipeline(folder_watcher, + sample_watcher, + PipelineType.MIXED_HCV) + + assert [call(pipelines_config.micall_filter_quality_pipeline_id), + call(pipelines_config.mixed_hcv_pipeline_id) + ] == mock_session.get_pipeline.call_args_list + mock_session.run_pipeline.assert_called_once_with( + mock_mixed_hcv_pipeline, + [main_fastq1, main_fastq2], + 'Mixed HCV on 2130A', + runbatch=mock_session.create_run_batch.return_value, + groups=['Everyone']) + assert mock_session.run_pipeline.return_value is run + + +def test_launch_mixed_hcv_disabled(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): + pipelines_config.mixed_hcv_pipeline_id = None + base_calls = (raw_data_with_hcv_pair / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + quality_csv = Mock(name='quality_csv') + main_fastq1 = Mock(name='main_fastq1') + main_fastq2 = Mock(name='main_fastq2') + midi_fastq1 = Mock(name='midi_fastq1') + midi_fastq2 = Mock(name='midi_fastq2') + mock_session = mock_open_kive.return_value + mock_quality_pipeline = Mock(name='quality_pipeline') + mock_mixed_hcv_pipeline = Mock(name='main_pipeline') + mock_session.get_pipeline.side_effect = [mock_quality_pipeline, + mock_mixed_hcv_pipeline] + mock_session.add_dataset.side_effect = [quality_csv, + main_fastq1, + main_fastq2, + midi_fastq1, + midi_fastq2] + mock_input = Mock(dataset_name='quality_csv') + mock_quality_pipeline.inputs = [mock_input] + mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'), + Mock(dataset_name='FASTQ2')] + kive_watcher = KiveWatcher(pipelines_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher, = kive_watcher.folder_watchers.values() + sample_watcher, = folder_watcher.sample_watchers + + run = kive_watcher.run_pipeline(folder_watcher, + sample_watcher, + PipelineType.MIXED_HCV) + + assert [call(pipelines_config.micall_filter_quality_pipeline_id) + ] == mock_session.get_pipeline.call_args_list + mock_session.run_pipeline.assert_not_called() + assert None is run + + def test_full_with_two_samples(raw_data_with_two_samples, mock_open_kive, pipelines_config): pipelines_config.max_active = 2 base_calls = (raw_data_with_two_samples / diff --git a/micall/tests/test_sample_watcher.py b/micall/tests/test_sample_watcher.py index 66a445a7c..27697897e 100644 --- a/micall/tests/test_sample_watcher.py +++ b/micall/tests/test_sample_watcher.py @@ -5,12 +5,20 @@ class DummySession: - def __init__(self): + def __init__(self, skipped_types=frozenset()): + """ Initialize. + + :param set skipped_types: {PipelineType} types that won't run + """ + self.skipped_types = skipped_types self.active_runs = [] - def run_pipeline(self, *args): - self.active_runs.append(args) - return args + def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): + if pipeline_type in self.skipped_types: + return None + run = (folder_watcher, sample_watcher, pipeline_type) + self.active_runs.append(run) + return run def fetch_run_status(self, run): return run not in self.active_runs @@ -128,6 +136,7 @@ def test_main_finished(): folder_watcher.poll_runs() # Start resistance assert [(folder_watcher, sample_watcher, PipelineType.RESISTANCE)] == session.active_runs + assert 1 == len(folder_watcher.active_runs) def test_resistance_running(): @@ -164,8 +173,9 @@ def test_resistance_finished(): session.active_runs.clear() # Finish resistance folder_watcher.poll_runs() # Finish sample - assert [] == session.active_runs - assert set() == folder_watcher.active_samples + assert not session.active_runs + assert not folder_watcher.active_samples + assert not folder_watcher.active_runs assert folder_watcher.is_complete @@ -182,7 +192,130 @@ def test_hcv_filter_quality_finished(): folder_watcher.poll_runs() # Start filter_quality session.active_runs.clear() # Finish filter_quality - folder_watcher.poll_runs() # start main + folder_watcher.poll_runs() # start main, midi, and mixed HCV + + assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV), + (folder_watcher, sample_watcher, PipelineType.MAIN), + (folder_watcher, sample_watcher, PipelineType.MIDI) + ] == session.active_runs + assert 3 == len(folder_watcher.active_runs) + + +def test_hcv_mixed_hcv_running(): + base_calls_folder = '/path/Data/Intensities/BaseCalls' + session = DummySession() + folder_watcher = FolderWatcher(base_calls_folder, runner=session) + sample_watcher = SampleWatcher(SampleGroup( + '2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher.sample_watchers.append(sample_watcher) + + folder_watcher.poll_runs() # Start filter_quality + session.active_runs.clear() # Finish filter_quality + + folder_watcher.poll_runs() # start main, midi, and mixed HCV + folder_watcher.poll_runs() # main, midi, and mixed HCV still running + + assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV), + (folder_watcher, sample_watcher, PipelineType.MAIN), + (folder_watcher, sample_watcher, PipelineType.MIDI) + ] == session.active_runs + assert 3 == len(folder_watcher.active_runs) + + +def test_hcv_mixed_hcv_finished(): + base_calls_folder = '/path/Data/Intensities/BaseCalls' + session = DummySession() + folder_watcher = FolderWatcher(base_calls_folder, runner=session) + sample_watcher = SampleWatcher(SampleGroup( + '2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher.sample_watchers.append(sample_watcher) + + folder_watcher.poll_runs() # Start filter_quality + session.active_runs.clear() # Finish filter_quality + + folder_watcher.poll_runs() # start main, midi, and mixed HCV + session.active_runs.remove( + (folder_watcher, sample_watcher, PipelineType.MIXED_HCV)) # Finish mixed HCV + folder_watcher.poll_runs() # main, midi, and mixed HCV still running assert [(folder_watcher, sample_watcher, PipelineType.MAIN), - (folder_watcher, sample_watcher, PipelineType.MIDI)] == session.active_runs + (folder_watcher, sample_watcher, PipelineType.MIDI) + ] == session.active_runs + assert 2 == len(folder_watcher.active_runs) + + +def test_hcv_mixed_hcv_not_finished(): + base_calls_folder = '/path/Data/Intensities/BaseCalls' + session = DummySession() + folder_watcher = FolderWatcher(base_calls_folder, runner=session) + sample_watcher = SampleWatcher(SampleGroup( + '2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher.sample_watchers.append(sample_watcher) + + folder_watcher.poll_runs() # Start filter_quality + session.active_runs.clear() # Finish filter_quality + + folder_watcher.poll_runs() # start main, midi, and mixed HCV + session.active_runs.remove( + (folder_watcher, sample_watcher, PipelineType.MAIN)) # Finish main + session.active_runs.remove( + (folder_watcher, sample_watcher, PipelineType.MIDI)) # Finish midi + folder_watcher.poll_runs() # mixed HCV still running, resistance started + session.active_runs.remove( + (folder_watcher, sample_watcher, PipelineType.RESISTANCE)) # Finish res + folder_watcher.poll_runs() # mixed HCV still running, resistance finished + + assert [(folder_watcher, sample_watcher, PipelineType.MIXED_HCV) + ] == session.active_runs + assert 1 == len(folder_watcher.active_runs) + assert not folder_watcher.is_complete + + +def test_mixed_hcv_skipped(): + base_calls_folder = '/path/Data/Intensities/BaseCalls' + session = DummySession(skipped_types={PipelineType.MIXED_HCV}) + folder_watcher = FolderWatcher(base_calls_folder, runner=session) + sample_watcher = SampleWatcher(SampleGroup( + '2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher.sample_watchers.append(sample_watcher) + + folder_watcher.poll_runs() # Start filter_quality + session.active_runs.clear() # Finish filter_quality + + folder_watcher.poll_runs() # start main and midi + + assert [(folder_watcher, sample_watcher, PipelineType.MAIN), + (folder_watcher, sample_watcher, PipelineType.MIDI) + ] == session.active_runs + assert 2 == len(folder_watcher.active_runs) + + +def test_mixed_hcv_skipped_and_complete(): + base_calls_folder = '/path/Data/Intensities/BaseCalls' + session = DummySession(skipped_types={PipelineType.MIXED_HCV}) + folder_watcher = FolderWatcher(base_calls_folder, runner=session) + sample_watcher = SampleWatcher(SampleGroup( + '2130A', + ('2130A-HCV_S15_L001_R1_001.fastq.gz', + '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) + folder_watcher.sample_watchers.append(sample_watcher) + + folder_watcher.poll_runs() # Start filter_quality + session.active_runs.clear() # Finish filter_quality + folder_watcher.poll_runs() # start main and midi + session.active_runs.clear() # Finish main and midi + folder_watcher.poll_runs() # start resistance + session.active_runs.clear() # Finish resistance + folder_watcher.poll_runs() # done + + assert not session.active_runs + assert not folder_watcher.active_runs + assert folder_watcher.is_complete