diff --git a/micall/monitor/kive_watcher.py b/micall/monitor/kive_watcher.py index f1122b9fd..2854f6597 100644 --- a/micall/monitor/kive_watcher.py +++ b/micall/monitor/kive_watcher.py @@ -1,9 +1,13 @@ import hashlib import logging +import os +import shutil +import tarfile from datetime import datetime, timedelta from queue import Full from io import StringIO, BytesIO +from time import sleep from micall.drivers.run_info import parse_read_sizes from micall.monitor import error_metrics_parser @@ -25,6 +29,7 @@ logger = logging.getLogger(__name__) FOLDER_SCAN_INTERVAL = timedelta(hours=1) +SLEEP_SECONDS = 60 DOWNLOADED_RESULTS = ['remap_counts_csv', 'conseq_csv', 'conseq_ins_csv', @@ -63,35 +68,51 @@ def now(): return datetime.now() -def find_samples(raw_data_folder, sample_queue, wait=True): +def find_samples(raw_data_folder, pipeline_version, sample_queue, wait=True): while True: - is_complete = scan_samples(raw_data_folder, sample_queue) + is_complete = scan_samples(raw_data_folder, + pipeline_version, + sample_queue, + wait) if is_complete and not wait: break -def scan_samples(raw_data_folder, sample_queue): +def scan_samples(raw_data_folder, pipeline_version, sample_queue, wait): next_scan = now() + FOLDER_SCAN_INTERVAL flag_paths = sorted( raw_data_folder.glob("MiSeq/runs/*/needsprocessing"), reverse=True) + is_found = False for flag_path in flag_paths: run_path = flag_path.parent + done_path = (run_path / + f"Results/version_{pipeline_version}/doneprocessing") + if done_path.exists(): + continue base_calls_path = run_path / "Data/Intensities/BaseCalls" fastq_files = base_calls_path.glob("*_R1_*.fastq.gz") sample_sheet_path = run_path / "SampleSheet.csv" file_names = [f.name for f in fastq_files] for sample_group in find_groups(file_names, sample_sheet_path): + is_found = True is_sent = False while not is_sent and now() < next_scan: try: - sample_queue.put((base_calls_path, sample_group), timeout=60) - logger.debug('Put %s, %s in queue.', base_calls_path, sample_group) + sample_queue.put((base_calls_path, sample_group), + timeout=SLEEP_SECONDS) + logger.debug('Put %s, %s in queue.', + base_calls_path, + sample_group) is_sent = True except Full: pass if now() >= next_scan: return False + if not is_found: + logger.info('No folders need processing.') + while wait and now() < next_scan: + sleep(SLEEP_SECONDS) return True @@ -99,10 +120,15 @@ def trim_name(sample_name): return '_'.join(sample_name.split('_')[:2]) +def get_output_filename(output_name): + return '.'.join(output_name.rsplit('_', 1)) + + class KiveWatcher: def __init__(self, config=None): self.config = config self.session = None + self.current_folder = None self.folder_watchers = {} # {base_calls_folder: FolderWatcher} self.pipelines = {} # {pipeline_id: pipeline} self.input_pipeline_ids = config and dict( @@ -204,12 +230,15 @@ def upload_kive_dataset(self, source_file, dataset_name, cdt, description): def add_sample_group(self, base_calls, sample_group): self.check_session() + self.current_folder = base_calls folder_watcher = self.folder_watchers.get(base_calls) if folder_watcher is None: folder_watcher = FolderWatcher(base_calls, self) self.folder_watchers[base_calls] = folder_watcher self.create_batch(folder_watcher) self.upload_filter_quality(folder_watcher) + shutil.rmtree(self.get_results_path(folder_watcher), + ignore_errors=True) sample_watcher = SampleWatcher(sample_group) for fastq1 in filter(None, sample_group.names): @@ -224,9 +253,73 @@ def add_sample_group(self, base_calls, sample_group): sample_watcher.fastq_datasets.append(fastq_dataset) folder_watcher.sample_watchers.append(sample_watcher) + def finish_folder(self): + self.current_folder = None + def poll_runs(self): - for folder_watcher in self.folder_watchers.values(): + completed_folders = [] + for folder, folder_watcher in self.folder_watchers.items(): folder_watcher.poll_runs() + if folder != self.current_folder and folder_watcher.is_complete: + completed_folders.append(folder) + for folder in completed_folders: + folder_watcher = self.folder_watchers.pop(folder) + self.collate_folder(folder_watcher) + if not self.folder_watchers: + logger.info('No more folders to process.') + + def collate_folder(self, folder_watcher): + results_path = self.get_results_path(folder_watcher) + logger.info('Collating results in %s', results_path) + scratch_path = results_path / "scratch" + for output_name in DOWNLOADED_RESULTS: + if output_name == 'coverage_maps_tar': + self.extract_coverage_maps(folder_watcher) + continue + source_count = 0 + filename = get_output_filename(output_name) + target_path = results_path / filename + with target_path.open('w') as target: + for sample_name in folder_watcher.all_samples: + sample_name = trim_name(sample_name) + source_path = scratch_path / sample_name / filename + try: + with source_path.open() as source: + for i, line in enumerate(source): + if i != 0: + prefix = sample_name + elif source_count == 0: + prefix = 'sample' + else: + continue + target.write(prefix + ',' + line) + source_count += 1 + except FileNotFoundError: + # Skip the file. + pass + if not source_count: + target_path.unlink() + shutil.rmtree(scratch_path) + (results_path / "doneprocessing").touch() + + def extract_coverage_maps(self, folder_watcher): + results_path = self.get_results_path(folder_watcher) + coverage_path = results_path / "coverage_maps" + coverage_path.mkdir() + scratch_path = results_path / "scratch" + for sample_name in folder_watcher.all_samples: + sample_name = trim_name(sample_name) + source_path = scratch_path / sample_name / 'coverage_maps.tar' + try: + with tarfile.open(source_path) as f: + for source_info in f: + filename = os.path.basename(source_info.name) + target_path = coverage_path / (sample_name + '.' + filename) + with f.extractfile(source_info) as source, \ + open(target_path, 'wb') as target: + shutil.copyfileobj(source, target) + except FileNotFoundError: + pass def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type): if pipeline_type == PipelineType.FILTER_QUALITY: @@ -305,8 +398,7 @@ def fetch_run_status(self, run, folder_watcher, sample_watcher, pipeline_type): if pipeline_type in (PipelineType.MIDI, PipelineType.MIXED_HCV_MIDI) else sample_watcher.sample_group.names[0]) - version_name = f'version_{self.config.pipeline_version}' - results_path = folder_watcher.run_folder / "Results" / version_name + results_path = self.get_results_path(folder_watcher) scratch_path = results_path / "scratch" / trim_name(sample_name) scratch_path.mkdir(parents=True, exist_ok=True) results = run.get_results() @@ -314,12 +406,17 @@ def fetch_run_status(self, run, folder_watcher, sample_watcher, pipeline_type): dataset = results.get(output_name) if dataset is None: continue - filename = '.'.join(output_name.rsplit('_', 1)) + filename = get_output_filename(output_name) with (scratch_path / filename).open('wb') as f: dataset.download(f) return is_complete + def get_results_path(self, folder_watcher): + version_name = f'version_{self.config.pipeline_version}' + results_path = folder_watcher.run_folder / "Results" / version_name + return results_path + def upload_filter_quality(self, folder_watcher): read_sizes = parse_read_sizes(folder_watcher.run_folder / "RunInfo.xml") read_lengths = [read_sizes.read1, diff --git a/micall/monitor/sample_watcher.py b/micall/monitor/sample_watcher.py index d7b0afcc3..19541c5ab 100644 --- a/micall/monitor/sample_watcher.py +++ b/micall/monitor/sample_watcher.py @@ -43,13 +43,17 @@ def __init__(self, def __repr__(self): return f'FolderWatcher({str(self.base_calls_folder)!r})' + @property + def all_samples(self): + for sample_watcher in self.sample_watchers: + for name in sample_watcher.sample_group.names: + if name is not None: + yield name + @property def active_samples(self): - started_samples = {name - for sample_watcher in self.sample_watchers - for name in sample_watcher.sample_group.names - if name is not None} - return started_samples - self.completed_samples + all_samples = set(self.all_samples) + return all_samples - self.completed_samples @property def is_complete(self): diff --git a/micall/tests/test_kive_watcher.py b/micall/tests/test_kive_watcher.py index 3a3cd7a20..5f32c878e 100644 --- a/micall/tests/test_kive_watcher.py +++ b/micall/tests/test_kive_watcher.py @@ -1,6 +1,9 @@ +import tarfile from gzip import GzipFile +from io import BytesIO from pathlib import Path from queue import Full +from tarfile import TarInfo from unittest.mock import patch, ANY, Mock, call import pytest @@ -135,8 +138,9 @@ def test_hcv_pair(raw_data_with_hcv_pair): "Data/Intensities/BaseCalls", SampleGroup('2130A', ('2130A-HCV_S15_L001_R1_001.fastq.gz', '2130AMIDI-MidHCV_S16_L001_R1_001.fastq.gz')))) + pipeline_version = 'XXX' - find_samples(raw_data_with_hcv_pair, sample_queue, wait=False) + find_samples(raw_data_with_hcv_pair, pipeline_version, sample_queue, wait=False) sample_queue.verify() @@ -153,8 +157,31 @@ def test_two_runs(raw_data_with_two_runs): "Data/Intensities/BaseCalls", SampleGroup('2000A', ('2000A-V3LOOP_S2_L001_R1_001.fastq.gz', None)))) + pipeline_version = 'XXX' - find_samples(raw_data_with_two_runs, sample_queue, wait=False) + find_samples(raw_data_with_two_runs, pipeline_version, sample_queue, wait=False) + + sample_queue.verify() + + +def test_skip_done_runs(raw_data_with_two_runs): + done_run = raw_data_with_two_runs / "MiSeq/runs/140201_M01234" + results_path = done_run / "Results/version_0-dev" + results_path.mkdir(parents=True) + done_path = results_path / "doneprocessing" + done_path.touch() + pipeline_version = '0-dev' + sample_queue = DummyQueueSink() + sample_queue.expect_put( + (raw_data_with_two_runs / "MiSeq/runs/140101_M01234" / + "Data/Intensities/BaseCalls", + SampleGroup('2000A', ('2000A-V3LOOP_S2_L001_R1_001.fastq.gz', + None)))) + + find_samples(raw_data_with_two_runs, + pipeline_version, + sample_queue, + wait=False) sample_queue.verify() @@ -172,8 +199,12 @@ def test_full_queue(raw_data_with_two_runs): None))) sample_queue.expect_put(item2, is_full=True) sample_queue.expect_put(item2) + pipeline_version = 'XXX' - find_samples(raw_data_with_two_runs, sample_queue, wait=False) + find_samples(raw_data_with_two_runs, + pipeline_version, + sample_queue, + wait=False) sample_queue.verify() @@ -203,8 +234,12 @@ def increment_clock(): callback=increment_clock) sample_queue.expect_put(item2) sample_queue.expect_put(item1) + pipeline_version = 'XXX' - find_samples(raw_data_with_two_runs, sample_queue, wait=False) + find_samples(raw_data_with_two_runs, + pipeline_version, + sample_queue, + wait=False) sample_queue.verify() @@ -274,6 +309,10 @@ def test_add_first_sample(raw_data_with_two_samples, mock_open_kive, default_con base_calls = (raw_data_with_two_samples / "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") mock_session = mock_open_kive.return_value + result_path = base_calls / "../../../Results/version_0-dev" + result_path.mkdir(parents=True) + old_stuff_csv = result_path / 'old_stuff.csv' + old_stuff_csv.write_text('out of date') dataset1 = Mock(name='quality_csv') dataset2 = Mock(name='fastq1') dataset3 = Mock(name='fastq2') @@ -331,6 +370,7 @@ def test_add_first_sample(raw_data_with_two_samples, mock_open_kive, default_con folder_watcher = kive_watcher.folder_watchers[base_calls] assert dataset1 is folder_watcher.quality_dataset assert [dataset2, dataset3] == folder_watcher.sample_watchers[0].fastq_datasets + assert not old_stuff_csv.exists() def test_poll_first_sample(raw_data_with_two_samples, mock_open_kive, default_config): @@ -1033,3 +1073,120 @@ def test_fetch_run_status_main_and_midi(raw_data_with_hcv_pair, pipelines_config assert is_midi_complete assert expected_main_nuc_path.exists() assert expected_midi_nuc_path.exists() + + +def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_config): + base_calls = (raw_data_with_two_samples / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + mock_session = mock_open_kive.return_value + mock_pipeline = mock_session.get_pipeline.return_value + mock_input = Mock(dataset_name='quality_csv') + mock_pipeline.inputs = [mock_input] + resistance_run1 = Mock(name='resistance_run1', + **{'is_complete.return_value': True, + 'get_results.return_value': create_datasets(['resistance_csv'])}) + resistance_run2 = Mock(name='resistance_run2', + **{'is_complete.return_value': True, + 'get_results.return_value': create_datasets(['resistance_csv'])}) + kive_watcher = KiveWatcher(default_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2110A', + ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + None))) + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2120A', + ('2120A-PR_S14_L001_R1_001.fastq.gz', + None))) + kive_watcher.finish_folder() + folder_watcher, = kive_watcher.folder_watchers.values() + folder_watcher.filter_quality_run = Mock() + sample1_watcher, sample2_watcher = folder_watcher.sample_watchers + sample1_watcher.main_runs.append(Mock()) + sample1_watcher.resistance_run = resistance_run1 + sample2_watcher.main_runs.append(Mock()) + sample2_watcher.resistance_run = resistance_run2 + folder_watcher.active_runs = { + resistance_run1: (sample1_watcher, PipelineType.RESISTANCE), + resistance_run2: (sample2_watcher, PipelineType.RESISTANCE)} + results_path = base_calls / "../../../Results/version_0-dev" + scratch_path = results_path / "scratch" + expected_coverage_map_content = b'This is a coverage map.' + sample_scratch_path = scratch_path / "2110A-V3LOOP_S13" + sample_scratch_path.mkdir(parents=True) + coverage_maps_path = sample_scratch_path / "coverage_maps.tar" + with tarfile.open(coverage_maps_path, 'w') as coverage_maps_tar: + content = BytesIO(expected_coverage_map_content) + tar_info = TarInfo('coverage_maps/R1_coverage.txt') + tar_info.size = len(expected_coverage_map_content) + coverage_maps_tar.addfile(tar_info, content) + expected_coverage_map_path = ( + results_path / "coverage_maps/2110A-V3LOOP_S13.R1_coverage.txt") + expected_done_path = results_path / "doneprocessing" + expected_mutations_path = results_path / "mutations.csv" + expected_resistance_path = results_path / "resistance.csv" + expected_resistance_content = """\ +sample,row,name +2110A-V3LOOP_S13,0,resistance_csv +2110A-V3LOOP_S13,1,resistance_csv +2110A-V3LOOP_S13,2,resistance_csv +2120A-PR_S14,0,resistance_csv +2120A-PR_S14,1,resistance_csv +2120A-PR_S14,2,resistance_csv +""" + + kive_watcher.poll_runs() + + assert not scratch_path.exists() + assert not expected_mutations_path.exists() + assert expected_resistance_content == expected_resistance_path.read_text() + assert expected_coverage_map_content == expected_coverage_map_path.read_bytes() + assert expected_done_path.exists() + + +def test_folder_not_finished(raw_data_with_two_samples, mock_open_kive, default_config): + base_calls = (raw_data_with_two_samples / + "MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls") + mock_session = mock_open_kive.return_value + mock_pipeline = mock_session.get_pipeline.return_value + mock_input = Mock(dataset_name='quality_csv') + mock_pipeline.inputs = [mock_input] + resistance_run1 = Mock(name='resistance_run1', + **{'is_complete.return_value': True, + 'get_results.return_value': create_datasets(['resistance_csv'])}) + resistance_run2 = Mock(name='resistance_run2', + **{'is_complete.return_value': True, + 'get_results.return_value': create_datasets(['resistance_csv'])}) + kive_watcher = KiveWatcher(default_config) + + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2110A', + ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', + None))) + kive_watcher.add_sample_group( + base_calls=base_calls, + sample_group=SampleGroup('2120A', + ('2120A-PR_S14_L001_R1_001.fastq.gz', + None))) + # Did not call kive_watcher.finish_folder(), more samples could be coming. + folder_watcher, = kive_watcher.folder_watchers.values() + folder_watcher.filter_quality_run = Mock() + sample1_watcher, sample2_watcher = folder_watcher.sample_watchers + sample1_watcher.main_runs.append(Mock()) + sample1_watcher.resistance_run = resistance_run1 + sample2_watcher.main_runs.append(Mock()) + sample2_watcher.resistance_run = resistance_run2 + folder_watcher.active_runs = { + resistance_run1: (sample1_watcher, PipelineType.RESISTANCE), + resistance_run2: (sample2_watcher, PipelineType.RESISTANCE)} + results_path = base_calls / "../../../Results/version_0-dev" + scratch_path = results_path / "scratch" + expected_resistance_path = results_path / "resistance.csv" + + kive_watcher.poll_runs() + + assert scratch_path.exists() + assert not expected_resistance_path.exists() diff --git a/micall_kive.py b/micall_kive.py index c34af23f6..a11165a59 100644 --- a/micall_kive.py +++ b/micall_kive.py @@ -83,6 +83,7 @@ def load_sample(args): failed_align_csv=args.failed_align_csv, coverage_scores_csv=args.coverage_scores_csv, scratch_path=scratch_path) + sample.name = None # Since the file names are messy in Kive. return sample diff --git a/micall_watcher.py b/micall_watcher.py index 4f2182ab8..457a03f9d 100644 --- a/micall_watcher.py +++ b/micall_watcher.py @@ -76,8 +76,12 @@ def main(): args = parse_args() kive_watcher = KiveWatcher(args) sample_queue = Queue(maxsize=2) + wait = True finder_thread = Thread(target=find_samples, - args=(args.raw_data, sample_queue, False), + args=(args.raw_data, + args.pipeline_version, + sample_queue, + wait), daemon=True) finder_thread.start() while True: @@ -90,7 +94,8 @@ def main(): timeout=POLLING_DELAY) kive_watcher.add_sample_group(base_calls, sample_group) except Empty: - pass + # No more samples coming for now. + kive_watcher.finish_folder() if __name__ == '__main__':