Skip to content

Commit

Permalink
Collate result files, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 5, 2018
1 parent e49b861 commit 3bc4e0f
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 20 deletions.
115 changes: 106 additions & 9 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import hashlib
import logging
import os
import shutil
import tarfile
from datetime import datetime, timedelta
from queue import Full

from io import StringIO, BytesIO
from time import sleep

from micall.drivers.run_info import parse_read_sizes
from micall.monitor import error_metrics_parser
Expand All @@ -25,6 +29,7 @@

logger = logging.getLogger(__name__)
FOLDER_SCAN_INTERVAL = timedelta(hours=1)
SLEEP_SECONDS = 60
DOWNLOADED_RESULTS = ['remap_counts_csv',
'conseq_csv',
'conseq_ins_csv',
Expand Down Expand Up @@ -63,46 +68,67 @@ def now():
return datetime.now()


def find_samples(raw_data_folder, sample_queue, wait=True):
def find_samples(raw_data_folder, pipeline_version, sample_queue, wait=True):
while True:
is_complete = scan_samples(raw_data_folder, sample_queue)
is_complete = scan_samples(raw_data_folder,
pipeline_version,
sample_queue,
wait)
if is_complete and not wait:
break


def scan_samples(raw_data_folder, sample_queue):
def scan_samples(raw_data_folder, pipeline_version, sample_queue, wait):
next_scan = now() + FOLDER_SCAN_INTERVAL
flag_paths = sorted(
raw_data_folder.glob("MiSeq/runs/*/needsprocessing"),
reverse=True)
is_found = False
for flag_path in flag_paths:
run_path = flag_path.parent
done_path = (run_path /
f"Results/version_{pipeline_version}/doneprocessing")
if done_path.exists():
continue
base_calls_path = run_path / "Data/Intensities/BaseCalls"
fastq_files = base_calls_path.glob("*_R1_*.fastq.gz")
sample_sheet_path = run_path / "SampleSheet.csv"
file_names = [f.name for f in fastq_files]
for sample_group in find_groups(file_names, sample_sheet_path):
is_found = True
is_sent = False
while not is_sent and now() < next_scan:
try:
sample_queue.put((base_calls_path, sample_group), timeout=60)
logger.debug('Put %s, %s in queue.', base_calls_path, sample_group)
sample_queue.put((base_calls_path, sample_group),
timeout=SLEEP_SECONDS)
logger.debug('Put %s, %s in queue.',
base_calls_path,
sample_group)
is_sent = True
except Full:
pass
if now() >= next_scan:
return False
if not is_found:
logger.info('No folders need processing.')
while wait and now() < next_scan:
sleep(SLEEP_SECONDS)
return True


def trim_name(sample_name):
return '_'.join(sample_name.split('_')[:2])


def get_output_filename(output_name):
return '.'.join(output_name.rsplit('_', 1))


class KiveWatcher:
def __init__(self, config=None):
self.config = config
self.session = None
self.current_folder = None
self.folder_watchers = {} # {base_calls_folder: FolderWatcher}
self.pipelines = {} # {pipeline_id: pipeline}
self.input_pipeline_ids = config and dict(
Expand Down Expand Up @@ -204,12 +230,15 @@ def upload_kive_dataset(self, source_file, dataset_name, cdt, description):

def add_sample_group(self, base_calls, sample_group):
self.check_session()
self.current_folder = base_calls
folder_watcher = self.folder_watchers.get(base_calls)
if folder_watcher is None:
folder_watcher = FolderWatcher(base_calls, self)
self.folder_watchers[base_calls] = folder_watcher
self.create_batch(folder_watcher)
self.upload_filter_quality(folder_watcher)
shutil.rmtree(self.get_results_path(folder_watcher),
ignore_errors=True)

sample_watcher = SampleWatcher(sample_group)
for fastq1 in filter(None, sample_group.names):
Expand All @@ -224,9 +253,73 @@ def add_sample_group(self, base_calls, sample_group):
sample_watcher.fastq_datasets.append(fastq_dataset)
folder_watcher.sample_watchers.append(sample_watcher)

def finish_folder(self):
self.current_folder = None

def poll_runs(self):
for folder_watcher in self.folder_watchers.values():
completed_folders = []
for folder, folder_watcher in self.folder_watchers.items():
folder_watcher.poll_runs()
if folder != self.current_folder and folder_watcher.is_complete:
completed_folders.append(folder)
for folder in completed_folders:
folder_watcher = self.folder_watchers.pop(folder)
self.collate_folder(folder_watcher)
if not self.folder_watchers:
logger.info('No more folders to process.')

def collate_folder(self, folder_watcher):
results_path = self.get_results_path(folder_watcher)
logger.info('Collating results in %s', results_path)
scratch_path = results_path / "scratch"
for output_name in DOWNLOADED_RESULTS:
if output_name == 'coverage_maps_tar':
self.extract_coverage_maps(folder_watcher)
continue
source_count = 0
filename = get_output_filename(output_name)
target_path = results_path / filename
with target_path.open('w') as target:
for sample_name in folder_watcher.all_samples:
sample_name = trim_name(sample_name)
source_path = scratch_path / sample_name / filename
try:
with source_path.open() as source:
for i, line in enumerate(source):
if i != 0:
prefix = sample_name
elif source_count == 0:
prefix = 'sample'
else:
continue
target.write(prefix + ',' + line)
source_count += 1
except FileNotFoundError:
# Skip the file.
pass
if not source_count:
target_path.unlink()
shutil.rmtree(scratch_path)
(results_path / "doneprocessing").touch()

def extract_coverage_maps(self, folder_watcher):
results_path = self.get_results_path(folder_watcher)
coverage_path = results_path / "coverage_maps"
coverage_path.mkdir()
scratch_path = results_path / "scratch"
for sample_name in folder_watcher.all_samples:
sample_name = trim_name(sample_name)
source_path = scratch_path / sample_name / 'coverage_maps.tar'
try:
with tarfile.open(source_path) as f:
for source_info in f:
filename = os.path.basename(source_info.name)
target_path = coverage_path / (sample_name + '.' + filename)
with f.extractfile(source_info) as source, \
open(target_path, 'wb') as target:
shutil.copyfileobj(source, target)
except FileNotFoundError:
pass

def run_pipeline(self, folder_watcher, sample_watcher, pipeline_type):
if pipeline_type == PipelineType.FILTER_QUALITY:
Expand Down Expand Up @@ -305,21 +398,25 @@ def fetch_run_status(self, run, folder_watcher, sample_watcher, pipeline_type):
if pipeline_type in (PipelineType.MIDI,
PipelineType.MIXED_HCV_MIDI)
else sample_watcher.sample_group.names[0])
version_name = f'version_{self.config.pipeline_version}'
results_path = folder_watcher.run_folder / "Results" / version_name
results_path = self.get_results_path(folder_watcher)
scratch_path = results_path / "scratch" / trim_name(sample_name)
scratch_path.mkdir(parents=True, exist_ok=True)
results = run.get_results()
for output_name in DOWNLOADED_RESULTS:
dataset = results.get(output_name)
if dataset is None:
continue
filename = '.'.join(output_name.rsplit('_', 1))
filename = get_output_filename(output_name)
with (scratch_path / filename).open('wb') as f:
dataset.download(f)

return is_complete

def get_results_path(self, folder_watcher):
version_name = f'version_{self.config.pipeline_version}'
results_path = folder_watcher.run_folder / "Results" / version_name
return results_path

def upload_filter_quality(self, folder_watcher):
read_sizes = parse_read_sizes(folder_watcher.run_folder / "RunInfo.xml")
read_lengths = [read_sizes.read1,
Expand Down
14 changes: 9 additions & 5 deletions micall/monitor/sample_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ def __init__(self,
def __repr__(self):
return f'FolderWatcher({str(self.base_calls_folder)!r})'

@property
def all_samples(self):
for sample_watcher in self.sample_watchers:
for name in sample_watcher.sample_group.names:
if name is not None:
yield name

@property
def active_samples(self):
started_samples = {name
for sample_watcher in self.sample_watchers
for name in sample_watcher.sample_group.names
if name is not None}
return started_samples - self.completed_samples
all_samples = set(self.all_samples)
return all_samples - self.completed_samples

@property
def is_complete(self):
Expand Down
Loading

0 comments on commit 3bc4e0f

Please sign in to comment.