Skip to content

Commit

Permalink
Retry when server is down, and skip failed runs, as part of #438.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Apr 12, 2018
1 parent b6360b6 commit a0803c7
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 58 deletions.
140 changes: 83 additions & 57 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,26 @@
from collections import namedtuple
from datetime import datetime, timedelta
from enum import Enum
from itertools import count
from pathlib import Path
from queue import Full

from io import StringIO, BytesIO
from time import sleep

from requests.adapters import HTTPAdapter
from kiveapi import KiveAPI, KiveClientException

from micall.drivers.run_info import parse_read_sizes
from micall.monitor import error_metrics_parser
from micall.monitor.sample_watcher import FolderWatcher, ALLOWED_GROUPS, SampleWatcher, PipelineType
from micall.resistance.resistance import find_groups

try:
from requests.adapters import HTTPAdapter
except ImportError:
# Ignore import errors during testing.
HTTPAdapter = None

logger = logging.getLogger(__name__)
FOLDER_SCAN_INTERVAL = timedelta(hours=1)
SLEEP_SECONDS = 60
MINIMUM_RETRY_WAIT = timedelta(seconds=5)
MAXIMUM_RETRY_WAIT = timedelta(days=1)
DOWNLOADED_RESULTS = ['remap_counts_csv',
'conseq_csv',
'conseq_ins_csv',
Expand All @@ -53,10 +51,6 @@


def open_kive(server_url):
if KiveAPI is None:
raise ImportError('Kive API failed to import. Is it installed?')
if HTTPAdapter is None:
raise ImportError('requests module failed to import. Is it installed?')
session = KiveAPI(server_url)
session.mount('https://', HTTPAdapter(max_retries=20))
return session
Expand Down Expand Up @@ -88,6 +82,9 @@ def scan_samples(raw_data_folder, pipeline_version, sample_queue, wait):
is_found = False
for flag_path in flag_paths:
run_path = flag_path.parent
error_path = run_path / "errorprocessing"
if error_path.exists():
continue
done_path = (run_path /
f"Results/version_{pipeline_version}/doneprocessing")
if done_path.exists():
Expand Down Expand Up @@ -139,6 +136,21 @@ def get_output_filename(output_name):
return '.'.join(output_name.rsplit('_', 1))


def wait_for_retry(attempt_count):
delay = calculate_retry_wait(MINIMUM_RETRY_WAIT,
MAXIMUM_RETRY_WAIT,
attempt_count)
logger.error('Waiting %s before retrying.', delay, exc_info=True)
sleep(delay.total_seconds())


def calculate_retry_wait(min_wait, max_wait, attempt_count):
min_seconds = int(min_wait.total_seconds())
seconds = min_seconds * (2 ** (attempt_count - 1))
seconds = min(seconds, max_wait.total_seconds())
return timedelta(seconds=seconds)


class KiveWatcher:
def __init__(self,
config=None,
Expand Down Expand Up @@ -265,39 +277,47 @@ def add_sample_group(self, base_calls, sample_group):
:return: SampleWatcher for the sample group, or None if that folder has
already finished processing
"""
self.check_session()
folder_watcher = self.folder_watchers.get(base_calls)
if folder_watcher is None:
folder_watcher = self.add_folder(base_calls)

# Check if folder has finished since it was scanned.
done_path = self.get_results_path(folder_watcher) / "doneprocessing"
if done_path.exists():
del self.folder_watchers[base_calls]
return None

self.create_batch(folder_watcher)
self.upload_filter_quality(folder_watcher)
shutil.rmtree(self.get_results_path(folder_watcher),
ignore_errors=True)

for sample_watcher in folder_watcher.sample_watchers:
if sample_watcher.sample_group == sample_group:
for attempt_count in count(1):
# noinspection PyBroadException
try:
self.check_session()
folder_watcher = self.folder_watchers.get(base_calls)
if folder_watcher is None:
folder_watcher = FolderWatcher(base_calls, self)

# Check if folder has finished since it was scanned.
results_path = self.get_results_path(folder_watcher)
done_path = results_path / "doneprocessing"
if done_path.exists():
return None
error_path = folder_watcher.run_folder / "errorprocessing"
if error_path.exists():
return None

self.create_batch(folder_watcher)
self.upload_filter_quality(folder_watcher)
shutil.rmtree(results_path, ignore_errors=True)
self.folder_watchers[base_calls] = folder_watcher

for sample_watcher in folder_watcher.sample_watchers:
if sample_watcher.sample_group == sample_group:
return sample_watcher

sample_watcher = SampleWatcher(sample_group)
for fastq1 in filter(None, sample_group.names):
fastq2 = fastq1.replace('_R1_', '_R2_')
for fastq_name, direction in ((fastq1, 'forward'), (fastq2, 'reverse')):
with (base_calls / fastq_name).open('rb') as fastq_file:
fastq_dataset = self.find_or_upload_dataset(
fastq_file,
fastq_name,
direction + ' read from MiSeq run ' + folder_watcher.run_name,
compounddatatype=None)
sample_watcher.fastq_datasets.append(fastq_dataset)
folder_watcher.sample_watchers.append(sample_watcher)
return sample_watcher

sample_watcher = SampleWatcher(sample_group)
for fastq1 in filter(None, sample_group.names):
fastq2 = fastq1.replace('_R1_', '_R2_')
for fastq_name, direction in ((fastq1, 'forward'), (fastq2, 'reverse')):
with (base_calls / fastq_name).open('rb') as fastq_file:
fastq_dataset = self.find_or_upload_dataset(
fastq_file,
fastq_name,
direction + ' read from MiSeq run ' + folder_watcher.run_name,
compounddatatype=None)
sample_watcher.fastq_datasets.append(fastq_dataset)
folder_watcher.sample_watchers.append(sample_watcher)
return sample_watcher
except Exception:
wait_for_retry(attempt_count)

def add_folder(self, base_calls):
folder_watcher = FolderWatcher(base_calls, self)
Expand All @@ -312,21 +332,27 @@ def finish_folder(self, base_calls):
self.loaded_folders.add(base_calls)

def poll_runs(self):
completed_folders = []
for folder, folder_watcher in self.folder_watchers.items():
folder_watcher.poll_runs()
if folder in self.loaded_folders and folder_watcher.is_complete:
completed_folders.append(folder)
for folder in completed_folders:
folder_watcher = self.folder_watchers.pop(folder)
results_path = self.collate_folder(folder_watcher)
if results_path is None:
continue
if (results_path / "coverage_scores.csv").exists():
self.result_handler(results_path)
(results_path / "doneprocessing").touch()
if not self.folder_watchers:
logger.info('No more folders to process.')
for attempt_count in count(1):
# noinspection PyBroadException
try:
completed_folders = []
for folder, folder_watcher in self.folder_watchers.items():
folder_watcher.poll_runs()
if folder in self.loaded_folders and folder_watcher.is_complete:
completed_folders.append(folder)
for folder in completed_folders:
folder_watcher = self.folder_watchers.pop(folder)
results_path = self.collate_folder(folder_watcher)
if results_path is None:
continue
if (results_path / "coverage_scores.csv").exists():
self.result_handler(results_path)
(results_path / "doneprocessing").touch()
if not self.folder_watchers:
logger.info('No more folders to process.')
return
except Exception:
wait_for_retry(attempt_count)

def collate_folder(self, folder_watcher):
""" Collate scratch files for a run folder.
Expand Down
Loading

0 comments on commit a0803c7

Please sign in to comment.