Skip to content

Commit

Permalink
Close #706 by adding the cascade_csv argument to the proviral pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 20, 2021
1 parent de07814 commit bc09731
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 20 deletions.
34 changes: 16 additions & 18 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,7 @@ def run_pipeline(self,
run = self.run_proviral_pipeline(
sample_watcher,
folder_watcher,
(PipelineType.DENOVO_MAIN,),
'MiCall proviral')
'Proviral HIVSeqinR')
return run
if pipeline_type == PipelineType.RESISTANCE:
run = self.run_resistance_pipeline(
Expand Down Expand Up @@ -849,27 +848,26 @@ def run_resistance_pipeline(self, sample_watcher, folder_watcher, input_pipeline
folder_watcher.batch)
return run

def run_proviral_pipeline(self, sample_watcher, folder_watcher, input_pipeline_types, description):
def run_proviral_pipeline(self, sample_watcher, folder_watcher, description):
pipeline_id = self.config.proviral_pipeline_id
if pipeline_id is None:
return None
main_runs = filter(None,
(sample_watcher.runs.get(pipeline_type)
for pipeline_type in input_pipeline_types))
input_dataset_urls = [run_dataset['dataset']
for run in main_runs
for run_dataset in run['datasets']
if run_dataset['argument_name'] in ('conseq_csv', 'contigs_csv')]
input_datasets = [self.kive_retry(lambda: self.session.get(url).json())
for url in input_dataset_urls]
if len(input_datasets) == 1:
input_datasets *= 2
input_datasets = sorted(input_datasets, key=lambda x: x['name'])
inputs_dict = dict(zip(('conseqs_csv', 'contigs_csv'),
input_datasets))
main_run = sample_watcher.runs.get(PipelineType.DENOVO_MAIN)
if main_run is None:
return None
input_dataset_urls = {
run_dataset['argument_name']: run_dataset['dataset']
for run_dataset in main_run['datasets']
if run_dataset['argument_name'] in ('conseq_csv',
'contigs_csv',
'cascade_csv')}
input_datasets = {
argument_name: self.kive_retry(lambda: self.session.get(url).json())
for argument_name, url in input_dataset_urls.items()}
input_datasets['conseqs_csv'] = input_datasets.pop('conseq_csv')
run = self.find_or_launch_run(
pipeline_id,
inputs_dict,
input_datasets,
description + ' on ' + sample_watcher.sample_group.enum,
folder_watcher.batch)
return run
Expand Down
4 changes: 2 additions & 2 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def poll_runs(self):
name for name in sample_watcher.sample_group.names
if name is not None)

def poll_sample_runs(self, sample_watcher):
def poll_sample_runs(self, sample_watcher: 'SampleWatcher'):
""" Poll all active runs for a sample.
:param sample_watcher: details about the sample to poll
Expand Down Expand Up @@ -203,7 +203,7 @@ def poll_sample_runs(self, sample_watcher):
or self.fetch_run_status(denovo_resistance_run))
proviral_run = sample_watcher.runs.get(PipelineType.PROVIRAL)
if proviral_run is None:
if 'NFL' not in sample_watcher.sample_group.names[0]:
if 'NFL' not in sample_watcher.sample_group.project_codes[0]:
is_proviral_complete = True
else:
self.run_pipeline(PipelineType.PROVIRAL,
Expand Down
65 changes: 65 additions & 0 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,71 @@ def test_launch_resistance_run(raw_data_with_two_samples, mock_open_kive, pipeli
groups_allowed=['Everyone']))


def test_launch_proviral_run(raw_data_with_two_samples, mock_open_kive):
pipelines_config = parse_args(argv=['--micall_filter_quality_pipeline_id', '42',
'--denovo_main_pipeline_id', '43',
'--proviral_pipeline_id', '145'])

base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
kive_watcher = KiveWatcher(pipelines_config)
kive_watcher.app_urls = {
pipelines_config.proviral_pipeline_id: '/containerapps/103'}
kive_watcher.app_args = {
pipelines_config.proviral_pipeline_id: dict(
contigs_csv='/containerargs/104',
conseqs_csv='/containerargs/105',
cascade_csv='/containerargs/106')}

folder_watcher = kive_watcher.add_folder(base_calls)
folder_watcher.batch = dict(url='/batches/101')
folder_watcher.add_run(dict(id=106),
PipelineType.FILTER_QUALITY,
is_complete=True)
sample_watcher = kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2120A',
('2120A-PR_S14_L001_R1_001.fastq.gz',
None),
('HIVNFLDNA', None)))
folder_watcher.add_run(
dict(id=107),
PipelineType.DENOVO_MAIN,
sample_watcher)

kive_watcher.check_session()
mock_session = kive_watcher.session
mock_session.endpoints.containerruns.get.side_effect = [
dict(id=107, state='C'), # refresh run state
[dict(dataset='/datasets/111/',
argument_type='O',
argument_name='contigs_csv'),
dict(dataset='/datasets/112/',
argument_type='O',
argument_name='conseq_csv'),
dict(dataset='/datasets/113/',
argument_type='O',
argument_name='cascade_csv')]] # run datasets
mock_session.get.return_value.json.side_effect = [
dict(url='/datasets/111/', id=111),
dict(url='/datasets/112/', id=112),
dict(url='/datasets/113/', id=113)]

kive_watcher.poll_runs()

mock_session.endpoints.containerruns.post.assert_called_once_with(json=dict(
app='/containerapps/103',
datasets=[dict(argument='/containerargs/104',
dataset='/datasets/111/'),
dict(argument='/containerargs/105',
dataset='/datasets/112/'),
dict(argument='/containerargs/106',
dataset='/datasets/113/')],
name='Proviral HIVSeqinR on 2120A',
batch='/batches/101',
groups_allowed=['Everyone']))


def test_skip_resistance_run(raw_data_with_two_samples, mock_open_kive, pipelines_config):
pipelines_config.micall_resistance_pipeline_id = None
base_calls = (raw_data_with_two_samples /
Expand Down

0 comments on commit bc09731

Please sign in to comment.