From d7fd61c808624e19cf5500e1c8f8ff71a5433a32 Mon Sep 17 00:00:00 2001 From: don Date: Tue, 10 Apr 2018 09:55:53 -0700 Subject: [PATCH] Upload external datasets, as part of #438. --- micall/monitor/kive_watcher.py | 89 +++++++++++++++++++------------ micall/tests/test_kive_watcher.py | 60 +++++++++++++++++++++ micall_watcher.py | 10 +++- 3 files changed, 124 insertions(+), 35 deletions(-) diff --git a/micall/monitor/kive_watcher.py b/micall/monitor/kive_watcher.py index b3fb61ce9..8256d0a6f 100644 --- a/micall/monitor/kive_watcher.py +++ b/micall/monitor/kive_watcher.py @@ -4,6 +4,7 @@ import shutil import tarfile from datetime import datetime, timedelta +from pathlib import Path from queue import Full from io import StringIO, BytesIO @@ -12,7 +13,6 @@ from micall.drivers.run_info import parse_read_sizes from micall.monitor import error_metrics_parser from micall.monitor.sample_watcher import FolderWatcher, ALLOWED_GROUPS, SampleWatcher, PipelineType -from micall.monitor.update_qai import process_folder from micall.resistance.resistance import find_groups try: @@ -126,12 +126,21 @@ def get_output_filename(output_name): class KiveWatcher: - def __init__(self, config=None): + def __init__(self, + config=None, + result_handler=lambda result_folder: None): + """ Initialize. + + :param config: command line arguments + :param result_handler: called when a run folder has collated all the + results into a result folder""" self.config = config + self.result_handler = result_handler self.session = None self.current_folder = None self.folder_watchers = {} # {base_calls_folder: FolderWatcher} self.pipelines = {} # {pipeline_id: pipeline} + self.external_directory_path = self.external_directory_name = None self.input_pipeline_ids = config and dict( quality_csv=config.micall_filter_quality_pipeline_id, bad_cycles_csv=config.micall_main_pipeline_id, @@ -207,26 +216,26 @@ def upload_kive_dataset(self, source_file, dataset_name, cdt, description): :return: the dataset object from the Kive API wrapper, or None """ logger.info('uploading dataset %r', dataset_name) - dataset = self.session.add_dataset( - name=dataset_name, - description=description, - handle=source_file, - cdt=cdt, - groups=ALLOWED_GROUPS) - # TODO: external data sets - # if (self.external_directory_name is None or - # not filename.startswith(self.external_directory_name)): - # else: - # external_path = os.path.relpath(filename, - # self.external_directory_path) - # dataset = self.kive.add_dataset( - # name=dataset_name, - # description=description, - # handle=None, - # externalfiledirectory=self.external_directory_name, - # external_path=external_path, - # cdt=cdt, - # groups=settings.kive_groups_allowed) + filepath = Path(getattr(source_file, 'name', '')) + if (self.external_directory_name is None or + self.external_directory_path not in filepath.parents): + dataset = self.session.add_dataset( + name=dataset_name, + description=description, + handle=source_file, + cdt=cdt, + groups=ALLOWED_GROUPS) + else: + external_path = os.path.relpath(filepath, + self.external_directory_path) + dataset = self.session.add_dataset( + name=dataset_name, + description=description, + handle=None, + externalfiledirectory=self.external_directory_name, + external_path=external_path, + cdt=cdt, + groups=ALLOWED_GROUPS) return dataset def add_sample_group(self, base_calls, sample_group): @@ -267,11 +276,7 @@ def poll_runs(self): folder_watcher = self.folder_watchers.pop(folder) results_path = self.collate_folder(folder_watcher) if (results_path / "coverage_scores.csv").exists(): - process_folder(results_path, - self.config.qai_server, - self.config.qai_user, - self.config.qai_password, - self.config.pipeline_version) # Upload to QAI. + self.result_handler(results_path) (results_path / "doneprocessing").touch() if not self.folder_watchers: logger.info('No more folders to process.') @@ -453,15 +458,31 @@ def check_session(self): self.session = open_kive(self.config.kive_server) self.session.login(self.config.kive_user, self.config.kive_password) - def find_or_upload_dataset(self, dataset_file, dataset_name, description, compounddatatype): - quality_dataset = self.find_kive_dataset(dataset_file, - dataset_name, - compounddatatype) - if quality_dataset is None: + # retrieve external file directory + directories = self.session.get('/api/externalfiledirectories', + is_json=True).json() + self.external_directory_name = self.external_directory_path = None + for directory in directories: + directory_path = Path(directory['path']) + if (directory_path == self.config.raw_data or + directory_path in self.config.raw_data.parents): + self.external_directory_name = directory['name'] + self.external_directory_path = directory_path + break + + def find_or_upload_dataset(self, + dataset_file, + dataset_name, + description, + compounddatatype): + dataset = self.find_kive_dataset(dataset_file, + dataset_name, + compounddatatype) + if dataset is None: dataset_file.seek(0) - quality_dataset = self.upload_kive_dataset( + dataset = self.upload_kive_dataset( dataset_file, dataset_name, compounddatatype, description) - return quality_dataset + return dataset diff --git a/micall/tests/test_kive_watcher.py b/micall/tests/test_kive_watcher.py index 5f32c878e..48a0138f7 100644 --- a/micall/tests/test_kive_watcher.py +++ b/micall/tests/test_kive_watcher.py @@ -373,6 +373,66 @@ def test_add_first_sample(raw_data_with_two_samples, mock_open_kive, default_con assert not old_stuff_csv.exists() +def test_add_external_dataset(raw_data_with_two_samples, mock_open_kive, default_config): + base_calls = (raw_data_with_two_samples / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + default_config.raw_data = raw_data_with_two_samples + mock_session = mock_open_kive.return_value + mock_session.get.return_value.json.return_value = [ + dict(name='raw_data', path=str(raw_data_with_two_samples))] + dataset1 = Mock(name='quality_csv') + dataset2 = Mock(name='fastq1') + dataset3 = Mock(name='fastq2') + mock_session.add_dataset.side_effect = [dataset1, dataset2, dataset3] + mock_pipeline = mock_session.get_pipeline.return_value + mock_input = Mock(dataset_name='quality_csv') + mock_pipeline.inputs = [mock_input] + kive_watcher = KiveWatcher(default_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2110A', + ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + None))) + + mock_session.get.assert_called_once_with('/api/externalfiledirectories', + is_json=True) + assert [call(cdt=mock_input.compounddatatype, + name='140101_M01234_quality.csv', + uploaded=True, + # MD5 of header with no records. + md5='6861a4a0bfd71b62c0048ff9a4910223'), + call(cdt=None, + name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + uploaded=True, + md5=ANY), + call(cdt=None, + name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz', + uploaded=True, + md5=ANY)] == mock_session.find_datasets.call_args_list + assert [call(name='140101_M01234_quality.csv', + description='Error rates for 140101_M01234 run.', + handle=ANY, + cdt=mock_input.compounddatatype, + groups=['Everyone']), + call(name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + description='forward read from MiSeq run 140101_M01234', + handle=None, + externalfiledirectory='raw_data', + external_path='MiSeq/runs/140101_M01234/Data/Intensities/' + 'BaseCalls/2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + cdt=None, + groups=['Everyone']), + call(name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz', + description='reverse read from MiSeq run 140101_M01234', + handle=None, + externalfiledirectory='raw_data', + external_path='MiSeq/runs/140101_M01234/Data/Intensities/' + 'BaseCalls/2110A-V3LOOP_S13_L001_R2_001.fastq.gz', + cdt=None, + groups=['Everyone'])] == mock_session.add_dataset.call_args_list + + def test_poll_first_sample(raw_data_with_two_samples, mock_open_kive, default_config): base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") diff --git a/micall_watcher.py b/micall_watcher.py index 270c6743e..43abb3b5f 100644 --- a/micall_watcher.py +++ b/micall_watcher.py @@ -1,12 +1,14 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, SUPPRESS import os +from functools import partial from pathlib import Path from queue import Queue, Empty from threading import Thread from time import sleep +from micall.monitor import update_qai from micall.monitor.kive_watcher import find_samples, KiveWatcher POLLING_DELAY = 10 # seconds between scans for new samples or finished runs @@ -88,7 +90,13 @@ def parse_args(argv=None): def main(): args = parse_args() - kive_watcher = KiveWatcher(args) + result_handler = partial(update_qai.process_folder, + qai_server=args.qai_server, + qai_user=args.qai_user, + qai_password=args.qai_password, + pipeline_version=args.pipeline_version) + kive_watcher = KiveWatcher(args, result_handler) + sample_queue = Queue(maxsize=2) wait = True finder_thread = Thread(target=find_samples,