Skip to content

Commit

Permalink
Upload external datasets, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 10, 2018
1 parent 20f6158 commit d7fd61c
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 35 deletions.
89 changes: 55 additions & 34 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import tarfile
from datetime import datetime, timedelta
from pathlib import Path
from queue import Full

from io import StringIO, BytesIO
Expand All @@ -12,7 +13,6 @@
from micall.drivers.run_info import parse_read_sizes
from micall.monitor import error_metrics_parser
from micall.monitor.sample_watcher import FolderWatcher, ALLOWED_GROUPS, SampleWatcher, PipelineType
from micall.monitor.update_qai import process_folder
from micall.resistance.resistance import find_groups

try:
Expand Down Expand Up @@ -126,12 +126,21 @@ def get_output_filename(output_name):


class KiveWatcher:
def __init__(self, config=None):
def __init__(self,
config=None,
result_handler=lambda result_folder: None):
""" Initialize.
:param config: command line arguments
:param result_handler: called when a run folder has collated all the
results into a result folder"""
self.config = config
self.result_handler = result_handler
self.session = None
self.current_folder = None
self.folder_watchers = {} # {base_calls_folder: FolderWatcher}
self.pipelines = {} # {pipeline_id: pipeline}
self.external_directory_path = self.external_directory_name = None
self.input_pipeline_ids = config and dict(
quality_csv=config.micall_filter_quality_pipeline_id,
bad_cycles_csv=config.micall_main_pipeline_id,
Expand Down Expand Up @@ -207,26 +216,26 @@ def upload_kive_dataset(self, source_file, dataset_name, cdt, description):
:return: the dataset object from the Kive API wrapper, or None
"""
logger.info('uploading dataset %r', dataset_name)
dataset = self.session.add_dataset(
name=dataset_name,
description=description,
handle=source_file,
cdt=cdt,
groups=ALLOWED_GROUPS)
# TODO: external data sets
# if (self.external_directory_name is None or
# not filename.startswith(self.external_directory_name)):
# else:
# external_path = os.path.relpath(filename,
# self.external_directory_path)
# dataset = self.kive.add_dataset(
# name=dataset_name,
# description=description,
# handle=None,
# externalfiledirectory=self.external_directory_name,
# external_path=external_path,
# cdt=cdt,
# groups=settings.kive_groups_allowed)
filepath = Path(getattr(source_file, 'name', ''))
if (self.external_directory_name is None or
self.external_directory_path not in filepath.parents):
dataset = self.session.add_dataset(
name=dataset_name,
description=description,
handle=source_file,
cdt=cdt,
groups=ALLOWED_GROUPS)
else:
external_path = os.path.relpath(filepath,
self.external_directory_path)
dataset = self.session.add_dataset(
name=dataset_name,
description=description,
handle=None,
externalfiledirectory=self.external_directory_name,
external_path=external_path,
cdt=cdt,
groups=ALLOWED_GROUPS)
return dataset

def add_sample_group(self, base_calls, sample_group):
Expand Down Expand Up @@ -267,11 +276,7 @@ def poll_runs(self):
folder_watcher = self.folder_watchers.pop(folder)
results_path = self.collate_folder(folder_watcher)
if (results_path / "coverage_scores.csv").exists():
process_folder(results_path,
self.config.qai_server,
self.config.qai_user,
self.config.qai_password,
self.config.pipeline_version) # Upload to QAI.
self.result_handler(results_path)
(results_path / "doneprocessing").touch()
if not self.folder_watchers:
logger.info('No more folders to process.')
Expand Down Expand Up @@ -453,15 +458,31 @@ def check_session(self):
self.session = open_kive(self.config.kive_server)
self.session.login(self.config.kive_user, self.config.kive_password)

def find_or_upload_dataset(self, dataset_file, dataset_name, description, compounddatatype):
quality_dataset = self.find_kive_dataset(dataset_file,
dataset_name,
compounddatatype)
if quality_dataset is None:
# retrieve external file directory
directories = self.session.get('/api/externalfiledirectories',
is_json=True).json()
self.external_directory_name = self.external_directory_path = None
for directory in directories:
directory_path = Path(directory['path'])
if (directory_path == self.config.raw_data or
directory_path in self.config.raw_data.parents):
self.external_directory_name = directory['name']
self.external_directory_path = directory_path
break

def find_or_upload_dataset(self,
dataset_file,
dataset_name,
description,
compounddatatype):
dataset = self.find_kive_dataset(dataset_file,
dataset_name,
compounddatatype)
if dataset is None:
dataset_file.seek(0)
quality_dataset = self.upload_kive_dataset(
dataset = self.upload_kive_dataset(
dataset_file,
dataset_name,
compounddatatype,
description)
return quality_dataset
return dataset
60 changes: 60 additions & 0 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,66 @@ def test_add_first_sample(raw_data_with_two_samples, mock_open_kive, default_con
assert not old_stuff_csv.exists()


def test_add_external_dataset(raw_data_with_two_samples, mock_open_kive, default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
default_config.raw_data = raw_data_with_two_samples
mock_session = mock_open_kive.return_value
mock_session.get.return_value.json.return_value = [
dict(name='raw_data', path=str(raw_data_with_two_samples))]
dataset1 = Mock(name='quality_csv')
dataset2 = Mock(name='fastq1')
dataset3 = Mock(name='fastq2')
mock_session.add_dataset.side_effect = [dataset1, dataset2, dataset3]
mock_pipeline = mock_session.get_pipeline.return_value
mock_input = Mock(dataset_name='quality_csv')
mock_pipeline.inputs = [mock_input]
kive_watcher = KiveWatcher(default_config)

kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2110A',
('2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
None)))

mock_session.get.assert_called_once_with('/api/externalfiledirectories',
is_json=True)
assert [call(cdt=mock_input.compounddatatype,
name='140101_M01234_quality.csv',
uploaded=True,
# MD5 of header with no records.
md5='6861a4a0bfd71b62c0048ff9a4910223'),
call(cdt=None,
name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
uploaded=True,
md5=ANY),
call(cdt=None,
name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz',
uploaded=True,
md5=ANY)] == mock_session.find_datasets.call_args_list
assert [call(name='140101_M01234_quality.csv',
description='Error rates for 140101_M01234 run.',
handle=ANY,
cdt=mock_input.compounddatatype,
groups=['Everyone']),
call(name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
description='forward read from MiSeq run 140101_M01234',
handle=None,
externalfiledirectory='raw_data',
external_path='MiSeq/runs/140101_M01234/Data/Intensities/'
'BaseCalls/2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
cdt=None,
groups=['Everyone']),
call(name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz',
description='reverse read from MiSeq run 140101_M01234',
handle=None,
externalfiledirectory='raw_data',
external_path='MiSeq/runs/140101_M01234/Data/Intensities/'
'BaseCalls/2110A-V3LOOP_S13_L001_R2_001.fastq.gz',
cdt=None,
groups=['Everyone'])] == mock_session.add_dataset.call_args_list


def test_poll_first_sample(raw_data_with_two_samples, mock_open_kive, default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
Expand Down
10 changes: 9 additions & 1 deletion micall_watcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, SUPPRESS

import os
from functools import partial

from pathlib import Path
from queue import Queue, Empty
from threading import Thread
from time import sleep

from micall.monitor import update_qai
from micall.monitor.kive_watcher import find_samples, KiveWatcher

POLLING_DELAY = 10 # seconds between scans for new samples or finished runs
Expand Down Expand Up @@ -88,7 +90,13 @@ def parse_args(argv=None):

def main():
args = parse_args()
kive_watcher = KiveWatcher(args)
result_handler = partial(update_qai.process_folder,
qai_server=args.qai_server,
qai_user=args.qai_user,
qai_password=args.qai_password,
pipeline_version=args.pipeline_version)
kive_watcher = KiveWatcher(args, result_handler)

sample_queue = Queue(maxsize=2)
wait = True
finder_thread = Thread(target=find_samples,
Expand Down

0 comments on commit d7fd61c

Please sign in to comment.