Skip to content

Commit

Permalink
Upload FASTQ files, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Mar 28, 2018
1 parent f44d3be commit b39548b
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 53 deletions.
112 changes: 77 additions & 35 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from micall.drivers.run_info import parse_read_sizes
from micall.monitor import error_metrics_parser
from micall.monitor.sample_watcher import FolderWatcher, ALLOWED_GROUPS
from micall.resistance.resistance import find_groups

try:
Expand All @@ -23,7 +24,6 @@
HTTPAdapter = None

logger = logging.getLogger(__name__)
ALLOWED_GROUPS = ['Everyone']
FOLDER_SCAN_INTERVAL = timedelta(hours=1)


Expand Down Expand Up @@ -81,26 +81,45 @@ class KiveWatcher:
def __init__(self, config=None):
self.config = config
self.session = None
self.current_run_folder = None
self.run_batches = {} # {run_folder: run_batch}
self.folder_watchers = {} # {base_calls_folder: FolderWatcher}
self.pipelines = {} # {pipeline_id: pipeline}
self.input_pipeline_ids = dict(
quality_csv=config.micall_filter_quality_pipeline_id,
bad_cycles_csv=config.micall_main_pipeline_id,
main_amino_csv=config.micall_resistance_pipeline_id,
midi_amino_csv=config.micall_resistance_pipeline_id)

def is_full(self):
return False

def get_kive_pipeline(self, pipeline_id):
kive_pipeline = self.session.get_pipeline(pipeline_id)
self.check_session()
kive_pipeline = self.pipelines.get(pipeline_id)
if kive_pipeline is None:
kive_pipeline = self.session.get_pipeline(pipeline_id)
self.pipelines[pipeline_id] = kive_pipeline
return kive_pipeline

def create_batch(self, run_name):
batch_name = run_name + ' ' + self.config.pipeline_version
def get_kive_input(self, input_name):
pipeline_id = self.input_pipeline_ids[input_name]
kive_pipeline = self.get_kive_pipeline(pipeline_id)
for kive_input in kive_pipeline.inputs:
if kive_input.dataset_name == input_name:
return kive_input
raise ValueError('Input {} not found on pipeline id {}.'.format(
input_name,
pipeline_id))

def create_batch(self, folder_watcher):
batch_name = folder_watcher.run_name + ' ' + self.config.pipeline_version
description = 'MiCall batch for folder {}, pipeline version {}.'.format(
run_name,
folder_watcher.run_name,
self.config.pipeline_version)
batch = self.session.create_run_batch(batch_name,
description=description,
users=[],
groups=ALLOWED_GROUPS)
return batch
folder_watcher.batch = batch

def find_kive_dataset(self, source_file, dataset_name, cdt):
""" Search for a dataset in Kive by name and checksum.
Expand Down Expand Up @@ -160,27 +179,36 @@ def upload_kive_dataset(self, source_file, dataset_name, cdt, description):
return dataset

def add_sample_group(self, base_calls, sample_group):
if self.session is None:
self.session = open_kive(self.config.kive_server)
self.session.login(self.config.kive_user, self.config.kive_password)
run_folder = (base_calls / "../../..").resolve()
run_name = '_'.join(run_folder.name.split('_')[:2])
run_batch = self.run_batches.get(run_folder)
if run_batch is not None:
return

run_batch = self.create_batch(run_name)
self.run_batches[run_folder] = run_batch

read_sizes = parse_read_sizes(run_folder / "RunInfo.xml")
self.check_session()
folder_watcher = self.folder_watchers.get(base_calls)
if folder_watcher is None:
folder_watcher = FolderWatcher(base_calls,
self.session,
self.config.pipeline_version)
self.folder_watchers[base_calls] = folder_watcher
self.create_batch(folder_watcher)

self.run_filter_quality(folder_watcher)
fastq1 = sample_group.names[0]
fastq2 = fastq1.replace('_R1_', '_R2_')
for fastq_name, direction in ((fastq1, 'forward'), (fastq2, 'reverse')):
with (base_calls / fastq_name).open('rb') as fastq_file:
self.find_or_upload_dataset(
fastq_file,
fastq_name,
direction + ' read from MiSeq run ' + folder_watcher.run_name,
compounddatatype=None)

def run_filter_quality(self, folder_watcher):
read_sizes = parse_read_sizes(folder_watcher.run_folder / "RunInfo.xml")
read_lengths = [read_sizes.read1,
read_sizes.index1,
read_sizes.index2,
read_sizes.read2]
quality_pipeline = self.get_kive_pipeline(
self.config.micall_filter_quality_pipeline_id)
quality_input = quality_pipeline.inputs[0]
error_path = run_folder / "InterOp/ErrorMetricsOut.bin"
error_path = folder_watcher.run_folder / "InterOp/ErrorMetricsOut.bin"
quality_csv = StringIO()
with error_path.open('rb') as error_file:
records = error_metrics_parser.read_errors(error_file)
Expand All @@ -190,18 +218,32 @@ def add_sample_group(self, base_calls, sample_group):
quality_csv_bytes = BytesIO()
quality_csv_bytes.write(quality_csv.getvalue().encode('utf8'))
quality_csv_bytes.seek(0)
quality_file_name = run_name + '_quality.csv'
quality_dataset = self.find_kive_dataset(quality_csv_bytes,
quality_file_name,
quality_input.compounddatatype)
quality_dataset = self.find_or_upload_dataset(
quality_csv_bytes,
folder_watcher.run_name + '_quality.csv',
'Error rates for {} run.'.format(folder_watcher.run_name),
quality_input.compounddatatype)
folder_watcher.filter_quality_run = self.session.run_pipeline(
quality_pipeline,
[quality_dataset],
'MiCall filter quality on ' + folder_watcher.run_name,
runbatch=folder_watcher.batch,
groups=ALLOWED_GROUPS)

def check_session(self):
if self.session is None:
self.session = open_kive(self.config.kive_server)
self.session.login(self.config.kive_user, self.config.kive_password)

def find_or_upload_dataset(self, dataset_file, dataset_name, description, compounddatatype):
quality_dataset = self.find_kive_dataset(dataset_file,
dataset_name,
compounddatatype)
if quality_dataset is None:
dataset_file.seek(0)
quality_dataset = self.upload_kive_dataset(
quality_csv_bytes,
quality_file_name,
quality_input.compounddatatype,
'Error rates for {} run.'.format(run_name))
self.session.run_pipeline(quality_pipeline,
[quality_dataset],
'MiCall filter quality on ' + run_name,
runbatch=run_batch,
groups=ALLOWED_GROUPS)
dataset_file,
dataset_name,
compounddatatype,
description)
return quality_dataset
27 changes: 27 additions & 0 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path

ALLOWED_GROUPS = ['Everyone']


class FolderWatcher:
def __init__(self,
base_calls_folder,
kive_session=None,
pipeline_version='unknown'):
self.base_calls_folder = Path(base_calls_folder)
self.session = kive_session
self.pipeline_version = pipeline_version
self.run_folder = (self.base_calls_folder / '../../..').resolve()
self.run_name = '_'.join(self.run_folder.name.split('_')[:2])
self.sample_groups = []
self.samples = {} # {file_name: SampleWatcher}
self.batch = None
self.filter_quality_run = None
self.filter_quality_finished = False

def __repr__(self):
return f'FolderWatcher({str(self.base_calls_folder)!r})'


class SampleWatcher:
pass
117 changes: 99 additions & 18 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from argparse import Namespace
from gzip import GzipFile
from pathlib import Path
from queue import Full
from unittest.mock import patch
from unittest.mock import patch, ANY, Mock, call

import pytest
from datetime import datetime, timedelta
Expand All @@ -29,8 +28,14 @@ def create_mock_open_kive():

@pytest.fixture(name='default_config')
def create_default_config():
config = parse_args(argv=['--micall_filter_quality_pipeline_id', '42'])
yield config
default_config = parse_args(argv=['--micall_filter_quality_pipeline_id', '42'])
yield default_config


@pytest.fixture(name='pipelines_config')
def create_pipelines_config(default_config):
default_config.micall_main_pipeline_id = 43
yield default_config


class DummyQueueSink:
Expand Down Expand Up @@ -185,18 +190,71 @@ def increment_clock():
sample_queue.verify()


def test_starts_empty():
kive_watcher = KiveWatcher(Namespace())
def test_starts_empty(default_config):
kive_watcher = KiveWatcher(default_config)

assert not kive_watcher.is_full()


def test_get_kive_pipeline(mock_open_kive, pipelines_config):
mock_session = mock_open_kive.return_value
expected_pipeline = mock_session.get_pipeline.return_value
kive_watcher = KiveWatcher(pipelines_config)

pipeline1 = kive_watcher.get_kive_pipeline(
pipelines_config.micall_main_pipeline_id)

assert expected_pipeline is pipeline1
mock_session.get_pipeline.assert_called_once_with(
pipelines_config.micall_main_pipeline_id)


def test_get_pipeline_cached(mock_open_kive, pipelines_config):
mock_session = mock_open_kive.return_value
kive_watcher = KiveWatcher(pipelines_config)

pipeline1 = kive_watcher.get_kive_pipeline(pipelines_config.micall_main_pipeline_id)
pipeline2 = kive_watcher.get_kive_pipeline(pipelines_config.micall_main_pipeline_id)

assert pipeline1 is pipeline2
mock_session.get_pipeline.assert_called_once_with(
pipelines_config.micall_main_pipeline_id)


def test_get_kive_input(pipelines_config):
kive_watcher = KiveWatcher(pipelines_config)
mock_pipeline = Mock()
kive_watcher.pipelines[
pipelines_config.micall_main_pipeline_id] = mock_pipeline
expected_input = Mock(dataset_name='bad_cycles_csv')
mock_pipeline.inputs = [Mock(), expected_input]

kive_input = kive_watcher.get_kive_input('bad_cycles_csv')

assert expected_input is kive_input


def test_get_kive_input_wrong_pipeline(pipelines_config):
pipelines_config.micall_resistance_pipeline_id = 44
kive_watcher = KiveWatcher(pipelines_config)
mock_pipeline = Mock(name='resistance_pipeline')
kive_watcher.pipelines[
pipelines_config.micall_resistance_pipeline_id] = mock_pipeline
mock_pipeline.inputs = [Mock(), Mock(dataset_name='bad_cycles_csv')]

with pytest.raises(
ValueError,
match=r'Input main_amino_csv not found on pipeline id 44\.'):
kive_watcher.get_kive_input('main_amino_csv')


def test_first_sample(raw_data_with_two_samples, mock_open_kive, default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
mock_session = mock_open_kive.return_value
mock_pipeline = mock_session.get_pipeline.return_value
mock_input = mock_pipeline.inputs.__getitem__.return_value
mock_input = Mock(dataset_name='quality_csv')
mock_pipeline.inputs = [mock_input]
kive_watcher = KiveWatcher(default_config)

kive_watcher.add_sample_group(
Expand All @@ -215,16 +273,34 @@ def test_first_sample(raw_data_with_two_samples, mock_open_kive, default_config)
groups=['Everyone'])
mock_session.get_pipeline.assert_called_once_with(
default_config.micall_filter_quality_pipeline_id)
mock_session.find_datasets.assert_called_once_with(
cdt=mock_input.compounddatatype,
name='140101_M01234_quality.csv',
uploaded=True,
md5='6861a4a0bfd71b62c0048ff9a4910223') # MD5 of header with no records.
add_args_list = mock_session.add_dataset.call_args_list
assert len(add_args_list) == 1
assert add_args_list[0][1]['name'] == '140101_M01234_quality.csv'
assert add_args_list[0][1]['description'] == 'Error rates for 140101_M01234 run.'
assert add_args_list[0][1]['cdt'] == mock_input.compounddatatype
assert [call(cdt=mock_input.compounddatatype,
name='140101_M01234_quality.csv',
uploaded=True,
# MD5 of header with no records.
md5='6861a4a0bfd71b62c0048ff9a4910223'),
call(cdt=None,
name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
uploaded=True,
md5=ANY),
call(cdt=None,
name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz',
uploaded=True,
md5=ANY)] == mock_session.find_datasets.call_args_list
assert [call(name='140101_M01234_quality.csv',
description='Error rates for 140101_M01234 run.',
handle=ANY,
cdt=mock_input.compounddatatype,
groups=['Everyone']),
call(name='2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
description='forward read from MiSeq run 140101_M01234',
handle=ANY,
cdt=None,
groups=['Everyone']),
call(name='2110A-V3LOOP_S13_L001_R2_001.fastq.gz',
description='reverse read from MiSeq run 140101_M01234',
handle=ANY,
cdt=None,
groups=['Everyone'])] == mock_session.add_dataset.call_args_list
mock_session.run_pipeline.assert_called_once_with(
mock_session.get_pipeline.return_value,
[mock_session.add_dataset.return_value],
Expand All @@ -233,10 +309,12 @@ def test_first_sample(raw_data_with_two_samples, mock_open_kive, default_config)
groups=['Everyone'])


def test(raw_data_with_two_samples, mock_open_kive, default_config):
def test_second_sample(raw_data_with_two_samples, mock_open_kive, default_config):
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
mock_session = mock_open_kive.return_value
mock_session.get_pipeline.return_value.inputs = [
Mock(dataset_name='quality_csv')]
kive_watcher = KiveWatcher(default_config)

kive_watcher.add_sample_group(
Expand All @@ -261,3 +339,6 @@ def test(raw_data_with_two_samples, mock_open_kive, default_config):
'MiCall filter quality on 140101_M01234',
runbatch=mock_session.create_run_batch.return_value,
groups=['Everyone'])
expected_dataset_count = 5 # quality_csv + 2 pairs of FASTQ files
assert expected_dataset_count == len(
mock_session.find_datasets.call_args_list)
29 changes: 29 additions & 0 deletions micall/tests/test_sample_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pathlib import Path

from micall.monitor.sample_watcher import FolderWatcher


def test_folder_watcher_repr():
base_calls_folder = '/path/Data/Intensities/BaseCalls'
expected_repr = "FolderWatcher('/path/Data/Intensities/BaseCalls')"
watcher = FolderWatcher(base_calls_folder)

assert expected_repr == repr(watcher)


def test_folder_watcher_repr_with_pathlib():
base_calls_folder = Path('/path/Data/Intensities/BaseCalls')
expected_repr = "FolderWatcher('/path/Data/Intensities/BaseCalls')"
watcher = FolderWatcher(base_calls_folder)

assert expected_repr == repr(watcher)


def test_folder_watcher_run_details():
base_calls_folder = '/path/140101_M01234_JUNK/Data/Intensities/BaseCalls'
expected_run_folder = Path('/path/140101_M01234_JUNK')
expected_run_name = '140101_M01234'
watcher = FolderWatcher(base_calls_folder)

assert expected_run_folder == watcher.run_folder
assert expected_run_name == watcher.run_name

0 comments on commit b39548b

Please sign in to comment.