From 208ba77a8ae0c106b986323b3cf12f45f56f08fe Mon Sep 17 00:00:00 2001 From: don Date: Tue, 10 Apr 2018 15:17:53 -0700 Subject: [PATCH] Fix broken tests, and simplify test set up, as part of #438. --- micall/monitor/kive_watcher.py | 19 +- micall/monitor/sample_watcher.py | 71 ++-- micall/tests/test_kive_watcher.py | 518 +++++++++++----------------- micall/tests/test_sample_watcher.py | 6 +- micall_watcher.py | 9 +- 5 files changed, 262 insertions(+), 361 deletions(-) diff --git a/micall/monitor/kive_watcher.py b/micall/monitor/kive_watcher.py index 8256d0a6f..0aa23b252 100644 --- a/micall/monitor/kive_watcher.py +++ b/micall/monitor/kive_watcher.py @@ -243,8 +243,7 @@ def add_sample_group(self, base_calls, sample_group): self.current_folder = base_calls folder_watcher = self.folder_watchers.get(base_calls) if folder_watcher is None: - folder_watcher = FolderWatcher(base_calls, self) - self.folder_watchers[base_calls] = folder_watcher + folder_watcher = self.add_folder(base_calls) self.create_batch(folder_watcher) self.upload_filter_quality(folder_watcher) shutil.rmtree(self.get_results_path(folder_watcher), @@ -262,6 +261,12 @@ def add_sample_group(self, base_calls, sample_group): compounddatatype=None) sample_watcher.fastq_datasets.append(fastq_dataset) folder_watcher.sample_watchers.append(sample_watcher) + return sample_watcher + + def add_folder(self, base_calls): + folder_watcher = FolderWatcher(base_calls, self) + self.folder_watchers[base_calls] = folder_watcher + return folder_watcher def finish_folder(self): self.current_folder = None @@ -334,7 +339,7 @@ def extract_coverage_maps(self, folder_watcher): except FileNotFoundError: pass - def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): + def run_pipeline(self, folder_watcher, pipeline_type, sample_watcher): if pipeline_type == PipelineType.FILTER_QUALITY: quality_pipeline = self.get_kive_pipeline( self.config.micall_filter_quality_pipeline_id) @@ -347,8 +352,12 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): if pipeline_type == PipelineType.RESISTANCE: resistance_pipeline = self.get_kive_pipeline( self.config.micall_resistance_pipeline_id) + main_runs = filter(None, + (sample_watcher.runs.get(pipeline_type) + for pipeline_type in (PipelineType.MAIN, + PipelineType.MIDI))) input_datasets = [run.get_results()['amino_csv'] - for run in sample_watcher.main_runs] + for run in main_runs] main_aminos_input = self.get_kive_input('main_amino_csv') for input_dataset in input_datasets: # TODO: remove this when Kive API sets CDT properly (issue #729) @@ -404,7 +413,7 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): runbatch=folder_watcher.batch, groups=ALLOWED_GROUPS) - def fetch_run_status(self, run, folder_watcher, sample_watcher, pipeline_type): + def fetch_run_status(self, run, folder_watcher, pipeline_type, sample_watcher): is_complete = run.is_complete() if is_complete and pipeline_type != PipelineType.FILTER_QUALITY: sample_name = (sample_watcher.sample_group.names[1] diff --git a/micall/monitor/sample_watcher.py b/micall/monitor/sample_watcher.py index 19541c5ab..1c290bd88 100644 --- a/micall/monitor/sample_watcher.py +++ b/micall/monitor/sample_watcher.py @@ -18,13 +18,13 @@ def __init__(self, run folder :param runner: an object for running Kive pipelines. Must have these methods: - run_pipeline(folder_watcher, sample_watcher, pipeline_type) + run_pipeline(folder_watcher, pipeline_type, sample_watcher) returns run, or None if that pipeline_type is not configured. fetch_run_status( run, folder_watcher, - sample_watcher, - pipeline_type) => True if successfully finished, raise if + pipeline_type, + sample_watcher) => True if successfully finished, raise if run failed, also saves the outputs to temporary files in the results folder when the run is finished """ @@ -62,7 +62,6 @@ def is_complete(self): def poll_runs(self): if self.filter_quality_run is None: self.filter_quality_run = self.run_pipeline( - None, PipelineType.FILTER_QUALITY) # Launched filter run, nothing more to check. return @@ -80,70 +79,74 @@ def poll_runs(self): def poll_sample_runs(self, sample_watcher): is_mixed_hcv_complete = False - if not sample_watcher.mixed_hcv_runs: + mixed_hcv_run = sample_watcher.runs.get(PipelineType.MIXED_HCV_MAIN) + if mixed_hcv_run is None: if 'HCV' not in sample_watcher.sample_group.names[0]: is_mixed_hcv_complete = True else: mixed_hcv_run = self.run_pipeline( - sample_watcher, - PipelineType.MIXED_HCV_MAIN) + PipelineType.MIXED_HCV_MAIN, + sample_watcher) if mixed_hcv_run is None: is_mixed_hcv_complete = True else: - sample_watcher.mixed_hcv_runs.append(mixed_hcv_run) if sample_watcher.sample_group.names[1] is not None: - mixed_hcv_run = self.run_pipeline( - sample_watcher, - PipelineType.MIXED_HCV_MIDI) - sample_watcher.mixed_hcv_runs.append(mixed_hcv_run) + self.run_pipeline(PipelineType.MIXED_HCV_MIDI, + sample_watcher) else: + mixed_hcv_midi_run = sample_watcher.runs.get(PipelineType.MIXED_HCV_MIDI) active_mixed_runs = [ run - for run in sample_watcher.mixed_hcv_runs + for run in (mixed_hcv_run, mixed_hcv_midi_run) if run in self.active_runs and not self.fetch_run_status(run)] is_mixed_hcv_complete = not active_mixed_runs - if not sample_watcher.main_runs: - sample_watcher.main_runs.append(self.run_pipeline( - sample_watcher, - PipelineType.MAIN)) + main_run = sample_watcher.runs.get(PipelineType.MAIN) + if main_run is None: + self.run_pipeline(PipelineType.MAIN, sample_watcher) if sample_watcher.sample_group.names[1] is not None: - sample_watcher.main_runs.append(self.run_pipeline( - sample_watcher, - PipelineType.MIDI)) + self.run_pipeline(PipelineType.MIDI, sample_watcher) # Launched main and midi runs, nothing more to check on sample. return False + midi_run = sample_watcher.runs.get(PipelineType.MIDI) active_main_runs = [ run - for run in sample_watcher.main_runs + for run in (main_run, midi_run) if run in self.active_runs and not self.fetch_run_status(run)] if active_main_runs: # Still running, nothing more to check on sample return False - if sample_watcher.resistance_run is None: - sample_watcher.resistance_run = self.run_pipeline( - sample_watcher, - PipelineType.RESISTANCE) + resistance_run = sample_watcher.runs.get(PipelineType.RESISTANCE) + if resistance_run is None: + self.run_pipeline(PipelineType.RESISTANCE, sample_watcher) # Launched resistance run, nothing more to check on sample. return False - if sample_watcher.resistance_run in self.active_runs: - if not self.fetch_run_status(sample_watcher.resistance_run): + if resistance_run in self.active_runs: + if not self.fetch_run_status(resistance_run): # Still running, nothing more to check on sample. return False return is_mixed_hcv_complete - def run_pipeline(self, sample_watcher, pipeline_type): - run = self.runner.run_pipeline(self, sample_watcher, pipeline_type) + def run_pipeline(self, pipeline_type, sample_watcher=None): + run = self.runner.run_pipeline(self, pipeline_type, sample_watcher) if run is not None: - self.active_runs[run] = (sample_watcher, pipeline_type) + self.add_run(run, pipeline_type, sample_watcher) return run + def add_run(self, run, pipeline_type, sample_watcher=None, is_complete=False): + if not is_complete: + self.active_runs[run] = (sample_watcher, pipeline_type) + if pipeline_type == PipelineType.FILTER_QUALITY: + self.filter_quality_run = run + else: + sample_watcher.runs[pipeline_type] = run + def fetch_run_status(self, run): sample_watcher, pipeline_type = self.active_runs[run] is_complete = self.runner.fetch_run_status(run, self, - sample_watcher, - pipeline_type) + pipeline_type, + sample_watcher) if is_complete: del self.active_runs[run] return is_complete @@ -153,9 +156,7 @@ class SampleWatcher: def __init__(self, sample_group): self.sample_group = sample_group self.fastq_datasets = [] - self.mixed_hcv_runs = [] - self.main_runs = [] - self.resistance_run = None + self.runs = {} # {pipeline_type: run} def __repr__(self): enum = self.sample_group.enum diff --git a/micall/tests/test_kive_watcher.py b/micall/tests/test_kive_watcher.py index 48a0138f7..c12e85de3 100644 --- a/micall/tests/test_kive_watcher.py +++ b/micall/tests/test_kive_watcher.py @@ -45,6 +45,13 @@ def create_mock_clock(): @pytest.fixture(name='mock_open_kive') def create_mock_open_kive(): with patch('micall.monitor.kive_watcher.open_kive') as mock_open_kive: + mock_session = mock_open_kive.return_value + + # By default, support calling the filter_quality pipeline. + mock_pipeline = mock_session.get_pipeline.return_value + mock_input = Mock(dataset_name='quality_csv') + mock_pipeline.inputs = [mock_input] + yield mock_open_kive @@ -108,7 +115,6 @@ def create_run_folder(tmpdir, run_name, sample_pattern): (interop / "ErrorMetricsOut.bin").write_bytes(error_data) (run / "needsprocessing").touch() - (run / "qc_uploaded").touch() return raw_data_folder @@ -437,11 +443,6 @@ def test_poll_first_sample(raw_data_with_two_samples, mock_open_kive, default_co base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") mock_session = mock_open_kive.return_value - mock_pipeline = mock_session.get_pipeline.return_value - mock_input = Mock(dataset_name='quality_csv') - mock_pipeline.inputs = [mock_input] - mock_run = mock_session.run_pipeline.return_value - mock_run.is_complete.return_value = False kive_watcher = KiveWatcher(default_config) kive_watcher.add_sample_group( @@ -450,52 +451,31 @@ def test_poll_first_sample(raw_data_with_two_samples, mock_open_kive, default_co ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) kive_watcher.poll_runs() - kive_watcher.poll_runs() - mock_open_kive.assert_called_once_with(default_config.kive_server) - mock_session.login.assert_called_once_with(default_config.kive_user, - default_config.kive_password) - mock_session.create_run_batch.assert_called_once_with( - '140101_M01234 v0-dev', - description='MiCall batch for folder 140101_M01234, pipeline version 0-dev.', - users=[], - groups=['Everyone']) - mock_session.get_pipeline.assert_called_once_with( - default_config.micall_filter_quality_pipeline_id) - assert [call(cdt=mock_input.compounddatatype, - name='140101_M01234_quality.csv', - uploaded=True, - # MD5 of header with no records. - md5='6861a4a0bfd71b62c0048ff9a4910223'), - call(cdt=None, - name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz', - uploaded=True, - md5=ANY), - call(cdt=None, - name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz', - uploaded=True, - md5=ANY)] == mock_session.find_datasets.call_args_list - assert [call(name='140101_M01234_quality.csv', - description='Error rates for 140101_M01234 run.', - handle=ANY, - cdt=mock_input.compounddatatype, - groups=['Everyone']), - call(name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz', - description='forward read from MiSeq run 140101_M01234', - handle=ANY, - cdt=None, - groups=['Everyone']), - call(name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz', - description='reverse read from MiSeq run 140101_M01234', - handle=ANY, - cdt=None, - groups=['Everyone'])] == mock_session.add_dataset.call_args_list mock_session.run_pipeline.assert_called_once_with( mock_session.get_pipeline.return_value, [mock_session.add_dataset.return_value], 'MiCall filter quality on 140101_M01234', runbatch=mock_session.create_run_batch.return_value, groups=['Everyone']) + + +def test_poll_first_sample_twice(raw_data_with_two_samples, mock_open_kive, default_config): + base_calls = (raw_data_with_two_samples / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + mock_session = mock_open_kive.return_value + mock_run = mock_session.run_pipeline.return_value + mock_run.is_complete.return_value = False + kive_watcher = KiveWatcher(default_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2110A', + ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + None))) + kive_watcher.poll_runs() + kive_watcher.poll_runs() + mock_run.is_complete.assert_called_once_with() @@ -503,8 +483,6 @@ def test_second_sample(raw_data_with_two_samples, mock_open_kive, default_config base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") mock_session = mock_open_kive.return_value - mock_session.get_pipeline.return_value.inputs = [ - Mock(dataset_name='quality_csv')] kive_watcher = KiveWatcher(default_config) kive_watcher.add_sample_group( @@ -532,8 +510,6 @@ def test_sample_with_hcv_pair(raw_data_with_hcv_pair, mock_open_kive, default_co base_calls = (raw_data_with_hcv_pair / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") mock_session = mock_open_kive.return_value - mock_session.get_pipeline.return_value.inputs = [ - Mock(dataset_name='quality_csv')] kive_watcher = KiveWatcher(default_config) kive_watcher.add_sample_group( @@ -568,16 +544,6 @@ def test_sample_already_uploaded(raw_data_with_two_samples, mock_open_kive, defa ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) - mock_open_kive.assert_called_once_with(default_config.kive_server) - mock_session.login.assert_called_once_with(default_config.kive_user, - default_config.kive_password) - mock_session.create_run_batch.assert_called_once_with( - '140101_M01234 v0-dev', - description='MiCall batch for folder 140101_M01234, pipeline version 0-dev.', - users=[], - groups=['Everyone']) - mock_session.get_pipeline.assert_called_once_with( - default_config.micall_filter_quality_pipeline_id) assert [call(cdt=mock_input.compounddatatype, name='140101_M01234_quality.csv', uploaded=True, @@ -610,38 +576,34 @@ def test_sample_already_uploaded(raw_data_with_two_samples, mock_open_kive, defa def test_launch_main_run(raw_data_with_two_samples, mock_open_kive, pipelines_config): base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - quality_csv = Mock(name='quality_csv') fastq1 = Mock(name='fastq1') fastq2 = Mock(name='fastq2') bad_cycles_csv = Mock(name='bad_cycles_csv') mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') mock_main_pipeline = Mock(name='main_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, mock_main_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] + mock_session.get_pipeline.return_value = mock_main_pipeline mock_main_pipeline.inputs = [Mock(dataset_name='fastq1'), Mock(dataset_name='fastq1'), Mock(dataset_name='bad_cycles_csv')] - mock_session.add_dataset.side_effect = [quality_csv, fastq1, fastq2] + mock_session.add_dataset.side_effect = [fastq1, fastq2] kive_watcher = KiveWatcher(pipelines_config) + folder_watcher = kive_watcher.add_folder(base_calls) + folder_watcher.batch = Mock('batch') kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) - folder_watcher, = kive_watcher.folder_watchers.values() - filter_quality_run = Mock( - name='quality_run', - **{'get_results.return_value': dict(bad_cycles_csv=bad_cycles_csv)}) - folder_watcher.filter_quality_run = filter_quality_run - sample_watcher, = folder_watcher.sample_watchers + folder_watcher.add_run( + Mock(name='quality_run', + **{'get_results.return_value': dict(bad_cycles_csv=bad_cycles_csv), + 'is_complete.return_value': True}), + PipelineType.FILTER_QUALITY) - run = kive_watcher.run_pipeline(folder_watcher, sample_watcher, PipelineType.MAIN) + kive_watcher.poll_runs() - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.micall_main_pipeline_id) + assert [call(pipelines_config.micall_main_pipeline_id) ] == mock_session.get_pipeline.call_args_list mock_session.run_pipeline.assert_called_once_with( mock_main_pipeline, @@ -649,57 +611,58 @@ def test_launch_main_run(raw_data_with_two_samples, mock_open_kive, pipelines_co fastq2, bad_cycles_csv], 'MiCall main on 2110A-V3LOOP_S13', - runbatch=mock_session.create_run_batch.return_value, + runbatch=folder_watcher.batch, groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run - assert bad_cycles_csv is folder_watcher.bad_cycles_dataset def test_launch_midi_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): base_calls = (raw_data_with_hcv_pair / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - quality_csv = Mock(name='quality_csv') main_fastq1 = Mock(name='main_fastq1') main_fastq2 = Mock(name='main_fastq2') midi_fastq1 = Mock(name='midi_fastq1') midi_fastq2 = Mock(name='midi_fastq2') bad_cycles_csv = Mock(name='bad_cycles_csv') mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') mock_main_pipeline = Mock(name='main_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, mock_main_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] - mock_session.add_dataset.side_effect = [quality_csv, - main_fastq1, + mock_session.get_pipeline.return_value = mock_main_pipeline + mock_main_pipeline.inputs = [Mock(dataset_name='fastq1'), + Mock(dataset_name='fastq1'), + Mock(dataset_name='bad_cycles_csv')] + mock_session.add_dataset.side_effect = [main_fastq1, main_fastq2, midi_fastq1, midi_fastq2] kive_watcher = KiveWatcher(pipelines_config) + folder_watcher = kive_watcher.add_folder(base_calls) + folder_watcher.batch = Mock('batch') kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2130A', ('2130A-HCV_S15_L001_R1_001.fastq.gz', '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) - folder_watcher, = kive_watcher.folder_watchers.values() - folder_watcher.bad_cycles_dataset = bad_cycles_csv - sample_watcher, = folder_watcher.sample_watchers + folder_watcher.add_run( + Mock(name='quality_run', + **{'get_results.return_value': dict(bad_cycles_csv=bad_cycles_csv), + 'is_complete.return_value': True}), + PipelineType.FILTER_QUALITY) - run = kive_watcher.run_pipeline(folder_watcher, sample_watcher, PipelineType.MIDI) + kive_watcher.poll_runs() - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.micall_main_pipeline_id) + assert [call(pipelines_config.micall_main_pipeline_id) ] == mock_session.get_pipeline.call_args_list - mock_session.run_pipeline.assert_called_once_with( - mock_main_pipeline, - [midi_fastq1, - midi_fastq2, - bad_cycles_csv], - 'MiCall main on 2130AMIDI-MidHCV_S16', - runbatch=mock_session.create_run_batch.return_value, - groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run + assert [call(mock_main_pipeline, + [main_fastq1, main_fastq2, bad_cycles_csv], + 'MiCall main on 2130A-HCV_S15', + runbatch=folder_watcher.batch, + groups=['Everyone']), + call(mock_main_pipeline, + [midi_fastq1, midi_fastq2, bad_cycles_csv], + 'MiCall main on 2130AMIDI-MidHCV_S16', + runbatch=folder_watcher.batch, + groups=['Everyone']) + ] == mock_session.run_pipeline.call_args_list def test_launch_resistance_run(raw_data_with_two_samples, mock_open_kive, pipelines_config): @@ -708,41 +671,39 @@ def test_launch_resistance_run(raw_data_with_two_samples, mock_open_kive, pipeli "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") amino_csv = Mock(name='amino_csv') mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') mock_resistance_pipeline = Mock(name='resistance_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, mock_resistance_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] + mock_session.get_pipeline.return_value = mock_resistance_pipeline mock_resistance_pipeline.inputs = [Mock(dataset_name='main_amino_csv'), Mock(dataset_name='midi_amino_csv')] kive_watcher = KiveWatcher(pipelines_config) - kive_watcher.add_sample_group( + folder_watcher = kive_watcher.add_folder(base_calls) + folder_watcher.batch = Mock('batch') + sample_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) - folder_watcher, = kive_watcher.folder_watchers.values() - sample_watcher, = folder_watcher.sample_watchers - main_run = Mock( - name='main_run', - **{'get_results.return_value': dict(amino_csv=amino_csv)}) - sample_watcher.main_runs.append(main_run) - - run = kive_watcher.run_pipeline(folder_watcher, - sample_watcher, - PipelineType.RESISTANCE) - - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.micall_resistance_pipeline_id) + folder_watcher.add_run(Mock(name='filter_quality_run'), + PipelineType.FILTER_QUALITY, + is_complete=True) + folder_watcher.add_run( + Mock(name='main_run', + **{'get_results.return_value': dict(amino_csv=amino_csv), + 'is_complete.return_value': True}), + PipelineType.MAIN, + sample_watcher) + + kive_watcher.poll_runs() + + assert [call(pipelines_config.micall_resistance_pipeline_id) ] == mock_session.get_pipeline.call_args_list mock_session.run_pipeline.assert_called_once_with( mock_resistance_pipeline, [amino_csv, amino_csv], 'MiCall resistance on 2110A', - runbatch=mock_session.create_run_batch.return_value, + runbatch=folder_watcher.batch, groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run def test_launch_hcv_resistance_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): @@ -752,198 +713,118 @@ def test_launch_hcv_resistance_run(raw_data_with_hcv_pair, mock_open_kive, pipel main_amino_csv = Mock(name='main_amino_csv') midi_amino_csv = Mock(name='midi_amino_csv') mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') mock_resistance_pipeline = Mock(name='main_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, - mock_resistance_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] + mock_session.get_pipeline.return_value = mock_resistance_pipeline mock_resistance_pipeline.inputs = [Mock(dataset_name='main_amino_csv'), Mock(dataset_name='midi_amino_csv')] kive_watcher = KiveWatcher(pipelines_config) - kive_watcher.add_sample_group( + folder_watcher = kive_watcher.add_folder(base_calls) + folder_watcher.batch = Mock('batch') + sample_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2130A', ('2130A-HCV_S15_L001_R1_001.fastq.gz', '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) - folder_watcher, = kive_watcher.folder_watchers.values() - sample_watcher, = folder_watcher.sample_watchers - main_run = Mock( - name='main_run', - **{'get_results.return_value': dict(amino_csv=main_amino_csv)}) - midi_run = Mock( - name='midi_run', - **{'get_results.return_value': dict(amino_csv=midi_amino_csv)}) - sample_watcher.main_runs.extend((main_run, midi_run)) - - run = kive_watcher.run_pipeline(folder_watcher, - sample_watcher, - PipelineType.RESISTANCE) - - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.micall_resistance_pipeline_id) + folder_watcher.add_run(Mock(name='filter_quality_run'), + PipelineType.FILTER_QUALITY, + is_complete=True) + folder_watcher.add_run( + Mock(name='main_run', + **{'get_results.return_value': dict(amino_csv=main_amino_csv), + 'is_complete.return_value': True}), + PipelineType.MAIN, + sample_watcher) + folder_watcher.add_run( + Mock(name='main_run', + **{'get_results.return_value': dict(amino_csv=midi_amino_csv), + 'is_complete.return_value': True}), + PipelineType.MIDI, + sample_watcher) + + kive_watcher.poll_runs() + + assert [call(pipelines_config.micall_resistance_pipeline_id) ] == mock_session.get_pipeline.call_args_list mock_session.run_pipeline.assert_called_once_with( mock_resistance_pipeline, [main_amino_csv, midi_amino_csv], 'MiCall resistance on 2130A', - runbatch=mock_session.create_run_batch.return_value, + runbatch=folder_watcher.batch, groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run def test_launch_mixed_hcv_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): pipelines_config.mixed_hcv_pipeline_id = 47 base_calls = (raw_data_with_hcv_pair / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - quality_csv = Mock(name='quality_csv') - main_fastq1 = Mock(name='main_fastq1') - main_fastq2 = Mock(name='main_fastq2') - midi_fastq1 = Mock(name='midi_fastq1') - midi_fastq2 = Mock(name='midi_fastq2') - mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') - mock_mixed_hcv_pipeline = Mock(name='mixed_hcv_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, - mock_mixed_hcv_pipeline] - mock_session.add_dataset.side_effect = [quality_csv, - main_fastq1, - main_fastq2, - midi_fastq1, - midi_fastq2] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] - mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'), - Mock(dataset_name='FASTQ2')] - kive_watcher = KiveWatcher(pipelines_config) - - kive_watcher.add_sample_group( - base_calls=base_calls, - sample_group=SampleGroup('2130A', - ('2130A-HCV_S15_L001_R1_001.fastq.gz', - '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) - folder_watcher, = kive_watcher.folder_watchers.values() - sample_watcher, = folder_watcher.sample_watchers - - run = kive_watcher.run_pipeline(folder_watcher, - sample_watcher, - PipelineType.MIXED_HCV_MAIN) - - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.mixed_hcv_pipeline_id) - ] == mock_session.get_pipeline.call_args_list - mock_session.run_pipeline.assert_called_once_with( - mock_mixed_hcv_pipeline, - [main_fastq1, main_fastq2], - 'Mixed HCV on 2130A-HCV_S15', - runbatch=mock_session.create_run_batch.return_value, - groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run - - -def test_launch_mixed_hcv_midi_run(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): - pipelines_config.mixed_hcv_pipeline_id = 47 - base_calls = (raw_data_with_hcv_pair / - "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - quality_csv = Mock(name='quality_csv') main_fastq1 = Mock(name='main_fastq1') main_fastq2 = Mock(name='main_fastq2') midi_fastq1 = Mock(name='midi_fastq1') midi_fastq2 = Mock(name='midi_fastq2') + bad_cycles_csv = Mock(name='bad_cycles_csv') mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') mock_mixed_hcv_pipeline = Mock(name='mixed_hcv_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, - mock_mixed_hcv_pipeline] - mock_session.add_dataset.side_effect = [quality_csv, - main_fastq1, - main_fastq2, - midi_fastq1, - midi_fastq2] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] - mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'), - Mock(dataset_name='FASTQ2')] - kive_watcher = KiveWatcher(pipelines_config) - - kive_watcher.add_sample_group( - base_calls=base_calls, - sample_group=SampleGroup('2130A', - ('2130A-HCV_S15_L001_R1_001.fastq.gz', - '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) - folder_watcher, = kive_watcher.folder_watchers.values() - sample_watcher, = folder_watcher.sample_watchers - - run = kive_watcher.run_pipeline(folder_watcher, - sample_watcher, - PipelineType.MIXED_HCV_MIDI) - - assert [call(pipelines_config.micall_filter_quality_pipeline_id), - call(pipelines_config.mixed_hcv_pipeline_id) - ] == mock_session.get_pipeline.call_args_list - mock_session.run_pipeline.assert_called_once_with( - mock_mixed_hcv_pipeline, - [midi_fastq1, midi_fastq2], - 'Mixed HCV on 2130AMIDI-MidHCV_S16', - runbatch=mock_session.create_run_batch.return_value, - groups=['Everyone']) - assert mock_session.run_pipeline.return_value is run - - -def test_launch_mixed_hcv_disabled(raw_data_with_hcv_pair, mock_open_kive, pipelines_config): - pipelines_config.mixed_hcv_pipeline_id = None - base_calls = (raw_data_with_hcv_pair / - "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - quality_csv = Mock(name='quality_csv') - main_fastq1 = Mock(name='main_fastq1') - main_fastq2 = Mock(name='main_fastq2') - midi_fastq1 = Mock(name='midi_fastq1') - midi_fastq2 = Mock(name='midi_fastq2') - mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') - mock_mixed_hcv_pipeline = Mock(name='main_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, - mock_mixed_hcv_pipeline] - mock_session.add_dataset.side_effect = [quality_csv, - main_fastq1, + mock_main_pipeline = Mock(name='main_pipeline') + mock_session.get_pipeline.side_effect = [mock_mixed_hcv_pipeline, + mock_main_pipeline] + mock_session.add_dataset.side_effect = [main_fastq1, main_fastq2, midi_fastq1, midi_fastq2] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] + mock_main_pipeline.inputs = [Mock(dataset_name='fastq1'), + Mock(dataset_name='fastq1'), + Mock(dataset_name='bad_cycles_csv')] mock_mixed_hcv_pipeline.inputs = [Mock(dataset_name='FASTQ1'), Mock(dataset_name='FASTQ2')] kive_watcher = KiveWatcher(pipelines_config) + folder_watcher = kive_watcher.add_folder(base_calls) + folder_watcher.batch = Mock('batch') kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2130A', ('2130A-HCV_S15_L001_R1_001.fastq.gz', '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz'))) - folder_watcher, = kive_watcher.folder_watchers.values() - sample_watcher, = folder_watcher.sample_watchers + folder_watcher.add_run( + Mock(name='quality_run', + **{'get_results.return_value': dict(bad_cycles_csv=bad_cycles_csv), + 'is_complete.return_value': True}), + PipelineType.FILTER_QUALITY) - run = kive_watcher.run_pipeline(folder_watcher, - sample_watcher, - PipelineType.MIXED_HCV_MAIN) + kive_watcher.poll_runs() - assert [call(pipelines_config.micall_filter_quality_pipeline_id) + assert [call(pipelines_config.mixed_hcv_pipeline_id), + call(pipelines_config.micall_main_pipeline_id) ] == mock_session.get_pipeline.call_args_list - mock_session.run_pipeline.assert_not_called() - assert None is run + assert [call(mock_mixed_hcv_pipeline, + [main_fastq1, main_fastq2], + 'Mixed HCV on 2130A-HCV_S15', + runbatch=folder_watcher.batch, + groups=['Everyone']), + call(mock_mixed_hcv_pipeline, + [midi_fastq1, midi_fastq2], + 'Mixed HCV on 2130AMIDI-MidHCV_S16', + runbatch=folder_watcher.batch, + groups=['Everyone']), + call(mock_main_pipeline, + [main_fastq1, main_fastq2, bad_cycles_csv], + 'MiCall main on 2130A-HCV_S15', + runbatch=folder_watcher.batch, + groups=['Everyone']), + call(mock_main_pipeline, + [midi_fastq1, midi_fastq2, bad_cycles_csv], + 'MiCall main on 2130AMIDI-MidHCV_S16', + runbatch=folder_watcher.batch, + groups=['Everyone']) + ] == mock_session.run_pipeline.call_args_list def test_full_with_two_samples(raw_data_with_two_samples, mock_open_kive, pipelines_config): + assert mock_open_kive pipelines_config.max_active = 2 base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') - mock_resistance_pipeline = Mock(name='resistance_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, mock_resistance_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] kive_watcher = KiveWatcher(pipelines_config) kive_watcher.add_sample_group( @@ -970,21 +851,14 @@ def test_full_with_two_samples(raw_data_with_two_samples, mock_open_kive, pipeli def test_full_with_two_runs(raw_data_with_two_runs, mock_open_kive, pipelines_config): + assert mock_open_kive pipelines_config.max_active = 2 base_calls1 = (raw_data_with_two_runs / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") base_calls2 = (raw_data_with_two_runs / "MiSeq/runs/140201_M01234/Data/Intensities/BaseCalls") - mock_session = mock_open_kive.return_value - mock_quality_pipeline = Mock(name='quality_pipeline') - mock_resistance_pipeline = Mock(name='resistance_pipeline') - mock_session.get_pipeline.side_effect = [mock_quality_pipeline, mock_resistance_pipeline] - mock_input = Mock(dataset_name='quality_csv') - mock_quality_pipeline.inputs = [mock_input] kive_watcher = KiveWatcher(pipelines_config) - # raw_data = create_run_folder(tmpdir, '140101_M01234', '2000A*.fastq') - # create_run_folder(tmpdir, '140201_M01234', '2010A*.fastq') kive_watcher.add_sample_group( base_calls=base_calls1, sample_group=SampleGroup('2000A', @@ -1015,8 +889,8 @@ def test_fetch_run_status_incomplete(): is_complete = kive_watcher.fetch_run_status(mock_run, folder_watcher=None, - sample_watcher=None, - pipeline_type=None) + pipeline_type=None, + sample_watcher=None) assert not is_complete @@ -1032,8 +906,8 @@ def test_fetch_run_status_filter_quality(raw_data_with_two_runs, pipelines_confi is_complete = kive_watcher.fetch_run_status(mock_run, folder_watcher, - sample_watcher, - PipelineType.FILTER_QUALITY) + PipelineType.FILTER_QUALITY, + sample_watcher) assert is_complete @@ -1063,8 +937,8 @@ def test_fetch_run_status_main(raw_data_with_two_runs, pipelines_config): is_complete = kive_watcher.fetch_run_status(mock_run, folder_watcher, - sample_watcher, - PipelineType.MAIN) + PipelineType.MAIN, + sample_watcher) assert is_complete assert expected_coord_ins_path.exists() @@ -1091,13 +965,13 @@ def test_fetch_run_status_main_and_resistance(raw_data_with_two_runs, pipelines_ is_main_complete = kive_watcher.fetch_run_status( mock_run, folder_watcher, - sample_watcher, - PipelineType.MAIN) + PipelineType.MAIN, + sample_watcher) is_resistance_complete = kive_watcher.fetch_run_status( mock_run, folder_watcher, - sample_watcher, - PipelineType.RESISTANCE) + PipelineType.RESISTANCE, + sample_watcher) assert is_main_complete assert is_resistance_complete @@ -1122,12 +996,12 @@ def test_fetch_run_status_main_and_midi(raw_data_with_hcv_pair, pipelines_config is_main_complete = kive_watcher.fetch_run_status(mock_run, folder_watcher, - sample_watcher, - PipelineType.MAIN) + PipelineType.MAIN, + sample_watcher) is_midi_complete = kive_watcher.fetch_run_status(mock_run, folder_watcher, - sample_watcher, - PipelineType.MIDI) + PipelineType.MIDI, + sample_watcher) assert is_main_complete assert is_midi_complete @@ -1136,12 +1010,9 @@ def test_fetch_run_status_main_and_midi(raw_data_with_hcv_pair, pipelines_config def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_config): + assert mock_open_kive base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - mock_session = mock_open_kive.return_value - mock_pipeline = mock_session.get_pipeline.return_value - mock_input = Mock(dataset_name='quality_csv') - mock_pipeline.inputs = [mock_input] resistance_run1 = Mock(name='resistance_run1', **{'is_complete.return_value': True, 'get_results.return_value': create_datasets(['resistance_csv'])}) @@ -1150,27 +1021,35 @@ def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_con 'get_results.return_value': create_datasets(['resistance_csv'])}) kive_watcher = KiveWatcher(default_config) - kive_watcher.add_sample_group( + folder_watcher = kive_watcher.add_folder(base_calls) + sample1_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) - kive_watcher.add_sample_group( + sample2_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2120A', ('2120A-PR_S14_L001_R1_001.fastq.gz', None))) kive_watcher.finish_folder() - folder_watcher, = kive_watcher.folder_watchers.values() - folder_watcher.filter_quality_run = Mock() - sample1_watcher, sample2_watcher = folder_watcher.sample_watchers - sample1_watcher.main_runs.append(Mock()) - sample1_watcher.resistance_run = resistance_run1 - sample2_watcher.main_runs.append(Mock()) - sample2_watcher.resistance_run = resistance_run2 - folder_watcher.active_runs = { - resistance_run1: (sample1_watcher, PipelineType.RESISTANCE), - resistance_run2: (sample2_watcher, PipelineType.RESISTANCE)} + folder_watcher.add_run(Mock(name='filter_quality_run'), + PipelineType.FILTER_QUALITY, + is_complete=True) + folder_watcher.add_run(Mock(name='main_run1'), + PipelineType.MAIN, + sample1_watcher, + is_complete=True) + folder_watcher.add_run(Mock(name='main_run2'), + PipelineType.MAIN, + sample2_watcher, + is_complete=True) + folder_watcher.add_run(resistance_run1, + PipelineType.RESISTANCE, + sample1_watcher) + folder_watcher.add_run(resistance_run2, + PipelineType.RESISTANCE, + sample2_watcher) results_path = base_calls / "../../../Results/version_0-dev" scratch_path = results_path / "scratch" expected_coverage_map_content = b'This is a coverage map.' @@ -1207,12 +1086,9 @@ def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_con def test_folder_not_finished(raw_data_with_two_samples, mock_open_kive, default_config): + assert mock_open_kive base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") - mock_session = mock_open_kive.return_value - mock_pipeline = mock_session.get_pipeline.return_value - mock_input = Mock(dataset_name='quality_csv') - mock_pipeline.inputs = [mock_input] resistance_run1 = Mock(name='resistance_run1', **{'is_complete.return_value': True, 'get_results.return_value': create_datasets(['resistance_csv'])}) @@ -1221,27 +1097,35 @@ def test_folder_not_finished(raw_data_with_two_samples, mock_open_kive, default_ 'get_results.return_value': create_datasets(['resistance_csv'])}) kive_watcher = KiveWatcher(default_config) - kive_watcher.add_sample_group( + folder_watcher = kive_watcher.add_folder(base_calls) + sample1_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None))) - kive_watcher.add_sample_group( + sample2_watcher = kive_watcher.add_sample_group( base_calls=base_calls, sample_group=SampleGroup('2120A', ('2120A-PR_S14_L001_R1_001.fastq.gz', None))) # Did not call kive_watcher.finish_folder(), more samples could be coming. - folder_watcher, = kive_watcher.folder_watchers.values() - folder_watcher.filter_quality_run = Mock() - sample1_watcher, sample2_watcher = folder_watcher.sample_watchers - sample1_watcher.main_runs.append(Mock()) - sample1_watcher.resistance_run = resistance_run1 - sample2_watcher.main_runs.append(Mock()) - sample2_watcher.resistance_run = resistance_run2 - folder_watcher.active_runs = { - resistance_run1: (sample1_watcher, PipelineType.RESISTANCE), - resistance_run2: (sample2_watcher, PipelineType.RESISTANCE)} + folder_watcher.add_run(Mock(name='filter_quality_run'), + PipelineType.FILTER_QUALITY, + is_complete=True) + folder_watcher.add_run(Mock(name='main_run1'), + PipelineType.MAIN, + sample1_watcher, + is_complete=True) + folder_watcher.add_run(Mock(name='main_run2'), + PipelineType.MAIN, + sample2_watcher, + is_complete=True) + folder_watcher.add_run(resistance_run1, + PipelineType.RESISTANCE, + sample1_watcher) + folder_watcher.add_run(resistance_run2, + PipelineType.RESISTANCE, + sample2_watcher) results_path = base_calls / "../../../Results/version_0-dev" scratch_path = results_path / "scratch" expected_resistance_path = results_path / "resistance.csv" diff --git a/micall/tests/test_sample_watcher.py b/micall/tests/test_sample_watcher.py index 126019a07..c64a2d2de 100644 --- a/micall/tests/test_sample_watcher.py +++ b/micall/tests/test_sample_watcher.py @@ -13,7 +13,7 @@ def __init__(self, skipped_types=frozenset()): self.skipped_types = skipped_types self.active_runs = [] # [(folder_watcher, sample_watcher, pipeline_type)] - def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): + def run_pipeline(self, folder_watcher, pipeline_type, sample_watcher): if pipeline_type in self.skipped_types: return None run = (folder_watcher, sample_watcher, pipeline_type) @@ -23,8 +23,8 @@ def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): def fetch_run_status(self, run, _folder_watcher, - _sample_watcher, - _pipeline_type): + _pipeline_type, + _sample_watcher): return run not in self.active_runs def finish_run(self, run): diff --git a/micall_watcher.py b/micall_watcher.py index 43abb3b5f..c813ac271 100644 --- a/micall_watcher.py +++ b/micall_watcher.py @@ -8,8 +8,12 @@ from threading import Thread from time import sleep -from micall.monitor import update_qai from micall.monitor.kive_watcher import find_samples, KiveWatcher +try: + from micall.monitor import update_qai +except ImportError: + # Swallow import error to allow testing. Check again at start of main(). + update_qai = None POLLING_DELAY = 10 # seconds between scans for new samples or finished runs @@ -89,6 +93,9 @@ def parse_args(argv=None): def main(): + if update_qai is None: + raise RuntimeError( + 'Failed to import update_qai. Is the requests library installed?') args = parse_args() result_handler = partial(update_qai.process_folder, qai_server=args.qai_server,