Skip to content

Commit

Permalink
Fix #270 by switching os.rename() to shutil.move().
Browse files Browse the repository at this point in the history
Also add a separate script to download results from Kive.
  • Loading branch information
donkirkby committed Dec 16, 2015
1 parent 91a691e commit 64ae651
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 86 deletions.
99 changes: 18 additions & 81 deletions MISEQ_MONITOR.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
import shutil
import subprocess
import sys
import tarfile
import time
from xml.etree import ElementTree

from kiveapi import KiveAPI

from micall.core import miseq_logging
from micall.monitor import qai_helper
from micall.utils.sample_sheet_parser import sample_sheet_parser
import micall.settings as settings
from micall.monitor import update_qai
from micall.monitor.kive_download import download_results, kive_login


if sys.version_info[:2] != (2, 7):
Expand All @@ -42,7 +40,6 @@ def init_logging(log_file):
logger = miseq_logging.init_logging(log_file,
file_log_level=logging.INFO,
console_log_level=logging.INFO)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
except Exception as e:
raise Exception("Couldn't setup logging (init_logging() threw exception '{}') - HALTING NOW!".format(str(e)))
return logger
Expand Down Expand Up @@ -151,9 +148,9 @@ def download_quality(run_info_path, destination, read_lengths, index_lengths):

logger = init_logging(settings.home + '/MISEQ_MONITOR_OUTPUT.log')

KiveAPI.SERVER_URL = settings.kive_server_url
kive = KiveAPI(verify=False)
kive.login(settings.kive_user, settings.kive_password)
kive = kive_login(settings.kive_server_url,
settings.kive_user,
settings.kive_password)

# retrieve Pipeline object based on version
pipeline = kive.get_pipeline(settings.pipeline_version_kive_id)
Expand Down Expand Up @@ -358,79 +355,6 @@ def launch_runs(fastqs, quality_input, run_name):
return kive_runs


def download_results(kive_runs, root, run_folder):
""" Retrieve pipeline output files from Kive
:param kive_runs: a list of sample names and RunStatus objects, in tuples
[(sample_name, run_status)]
:param root: the path to the root folder of the run where the results
should be copied
:param run_folder: the local folder that will hold working files.
:return: a failure message if anything went wrong
"""
if not settings.production:
results_folder = os.path.join(run_folder, 'results')
else:
results_parent = os.path.join(root, 'Results')
if not os.path.exists(results_parent):
os.mkdir(results_parent)
results_folder = os.path.join(results_parent,
'version_' + settings.pipeline_version)
if not os.path.exists(results_folder):
os.mkdir(results_folder)
tar_path = os.path.join(run_folder, 'coverage_maps.tar')
untar_path = os.path.join(run_folder, 'untar')
coverage_source_path = os.path.join(untar_path, 'coverage_maps')
coverage_dest_path = os.path.join(results_folder, 'coverage_maps')
os.mkdir(untar_path)
os.mkdir(coverage_source_path)
os.mkdir(coverage_dest_path)

for i, (sample_name, kive_run) in enumerate(kive_runs):
outputs = kive_run.get_results()
output_names = ['remap_counts',
'conseq',
'conseq_ins',
'failed_read',
'nuc',
'amino',
'coord_ins',
'failed_align',
'nuc_variants',
'g2p',
'coverage_scores',
'coverage_maps_tar']
for output_name in output_names:
dataset = outputs.get(output_name, None)
if not dataset:
continue
if not output_name.endswith('_tar'):
filename = os.path.join(results_folder, output_name + '.csv')
with open(filename, 'a') as result_file:
for j, line in enumerate(dataset.readlines()):
if i == 0 and j == 0:
result_file.write('sample,' + line)
elif j != 0:
result_file.write(sample_name + ',' + line)
else:
with open(tar_path, 'wb') as f:
dataset.download(f)
with tarfile.open(tar_path) as tar:
tar.extractall(untar_path)
for image_filename in os.listdir(coverage_source_path):
source = os.path.join(coverage_source_path, image_filename)
destination = os.path.join(coverage_dest_path, sample_name + '.' + image_filename)
os.rename(source, destination)

os.rmdir(coverage_source_path)
os.rmdir(untar_path)
os.remove(tar_path)

if settings.production:
update_qai.process_folder(results_folder, logger)
mark_run_as_done(results_folder)


def main():
global logger
processed_runs = set()
Expand Down Expand Up @@ -501,7 +425,20 @@ def main():
continue

try:
download_results(kive_runs, root, run_folder)
if not settings.production:
results_folder = os.path.join(run_folder, 'results')
else:
results_parent = os.path.join(root, 'Results')
if not os.path.exists(results_parent):
os.mkdir(results_parent)
results_folder = os.path.join(results_parent,
'version_' + settings.pipeline_version)
if not os.path.exists(results_folder):
os.mkdir(results_folder)
download_results(kive_runs, results_folder, run_folder)
if settings.production:
update_qai.process_folder(results_folder, logger)
mark_run_as_done(results_folder)
logger.info("===== %s file transfer completed =====", run_name)
except:
failure_message = mark_run_as_disabled(
Expand Down
11 changes: 8 additions & 3 deletions micall/core/miseq_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ def init_logging_console_only(log_level=logging.DEBUG):
logger.setLevel(logging.DEBUG)
console_logger = logging.StreamHandler(sys.stdout)
console_logger.setLevel(log_level)
formatter = Timestamp('%(asctime)s - [%(levelname)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S.%f")
formatter = Timestamp('%(asctime)s - [%(levelname)s](%(name)s) %(message)s', datefmt="%Y-%m-%d %H:%M:%S.%f")
console_logger.setFormatter(formatter)
logger.addHandler(console_logger)

# Quiet the urllib3 logger
connection_logger = logging.getLogger('urllib3.connectionpool')
connection_logger.setLevel(logging.ERROR)
return logger


Expand All @@ -54,15 +58,16 @@ def init_logging(logging_path, file_log_level=logging.DEBUG, console_log_level=l
console_logger.setLevel(console_log_level)

# Format the handlers
formatter = Timestamp('%(asctime)s - [%(levelname)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S.%f")
formatter = Timestamp('%(asctime)s - [%(levelname)s](%(name)s) %(message)s',
datefmt="%Y-%m-%d %H:%M:%S.%f")
console_logger.setFormatter(formatter)
file_logger.setFormatter(formatter)
logger.addHandler(console_logger)
logger.addHandler(file_logger)

# Quiet the urllib3 logger
connection_logger = logging.getLogger('urllib3.connectionpool')
connection_logger.setLevel(logging.WARN)
connection_logger.setLevel(logging.ERROR)

return logger

Expand Down
157 changes: 157 additions & 0 deletions micall/monitor/kive_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# To run this as a script, use python -m micall.monitor.kive_download

import os
import tarfile
from argparse import ArgumentParser

from kiveapi import KiveAPI
from requests.adapters import HTTPAdapter
from kiveapi.runstatus import RunStatus

from micall.settings import kive_server_url, kive_user, kive_password, home
from micall.core.miseq_logging import init_logging
import shutil


def kive_login(server_url, user, password):
kive = KiveAPI(server_url)
kive.mount('https://', HTTPAdapter(max_retries=20))
kive.login(user, password)
return kive


def download_results(kive_runs, results_folder, run_folder):
""" Retrieve pipeline output files from Kive
@param kive_runs: a list of sample names and RunStatus objects, in tuples
[(sample_name, run_status)]
@param results_folder: the path where the results should be copied
@param run_folder: the local folder that will hold working files.
"""
tar_path = os.path.join(run_folder, 'coverage_maps.tar')
untar_path = os.path.join(run_folder, 'untar')
coverage_source_path = os.path.join(untar_path, 'coverage_maps')
coverage_dest_path = os.path.join(results_folder, 'coverage_maps')
if not os.path.isdir(untar_path):
os.mkdir(untar_path)
if not os.path.isdir(coverage_source_path):
os.mkdir(coverage_source_path)
os.mkdir(coverage_dest_path)

for i, (sample_name, kive_run) in enumerate(kive_runs):
outputs = kive_run.get_results()
output_names = ['remap_counts',
'conseq',
'conseq_ins',
'failed_read',
'nuc',
'amino',
'coord_ins',
'failed_align',
'nuc_variants',
'g2p',
'coverage_scores',
'coverage_maps_tar']
for output_name in output_names:
dataset = outputs.get(output_name, None)
if not dataset:
continue
if not output_name.endswith('_tar'):
filename = os.path.join(results_folder, output_name + '.csv')
with open(filename, 'a') as result_file:
for j, line in enumerate(dataset.readlines()):
if i == 0 and j == 0:
result_file.write('sample,' + line)
elif j != 0:
result_file.write(sample_name + ',' + line)
else:
with open(tar_path, 'wb') as f:
dataset.download(f)
with tarfile.open(tar_path) as tar:
tar.extractall(untar_path)
for image_filename in os.listdir(coverage_source_path):
source = os.path.join(coverage_source_path, image_filename)
destination = os.path.join(coverage_dest_path, sample_name + '.' + image_filename)
shutil.move(source, destination)

os.rmdir(coverage_source_path)
os.rmdir(untar_path)
os.remove(tar_path)


def find_old_runs(kive, **kwargs):
params = {}
param_count = 0
for key, val in kwargs.iteritems():
if val is not None:
params['filters[{}][key]'.format(param_count)] = key
params['filters[{}][val]'.format(param_count)] = val
param_count += 1
response = kive.get('/api/runs/status/', params=params)
response.raise_for_status()
json = response.json()
runs = []
for entry in json:
status = RunStatus(entry, kive)
status.json = entry
sample_name = entry['display_name']
status_response = kive.get(entry['run_status'])
status_response.raise_for_status()
sample_filename = status_response.json()['inputs']['2']['dataset_name']
sample_name = '_'.join(sample_filename.split('_')[:2])
runs.append((sample_name, status))
runs.sort()
return runs


def main():
logger = init_logging(os.path.join(home, 'kive_download.log'))
parser = ArgumentParser(description='Download runs from Kive.')
parser.add_argument('--startafter',
'-a',
help='Old runs start after "DD Mon YYYY HH:MM".')
parser.add_argument('--startbefore',
'-b',
help='Old runs start before "DD Mon YYYY HH:MM".')
parser.add_argument('--workfolder',
'-w',
help='Work folder to download temporary files to.')
parser.add_argument('--resultfolder',
'-r',
help='Result folder to copy result files to.')

args = parser.parse_args()
logger.info('Starting.')
kive = kive_login(kive_server_url,
kive_user,
kive_password)
runs = find_old_runs(kive,
startafter=args.startafter,
startbefore=args.startbefore)
unfinished_count = 0
for sample_name, run in runs:
progress = run.json.get('run_progress')
if progress:
start_time = progress['start']
end_time = progress['end']
else:
start_time = end_time = None
if end_time is None:
unfinished_count += 1
print(run.json['display_name'])
print(' ' + sample_name)
print(' {} - {}'.format(start_time, end_time))
if args.workfolder or args.resultfolder:
if not args.workfolder:
parser.error('argument --workfolder is required with --resultfolder')
if not args.resultfolder:
parser.error('argument --resultfolder is required with --workfolder')
if not os.path.isdir(args.workfolder):
os.makedirs(args.workfolder)
if not os.path.isdir(args.resultfolder):
os.makedirs(args.resultfolder)
download_results(runs, args.resultfolder, args.workfolder)
logger.info('%d runs found (%d unfinished).', len(runs), unfinished_count)

if __name__ == '__main__':
main()
3 changes: 1 addition & 2 deletions micall/monitor/update_qai.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python

# Script to update QAI with information from the
# collated_conseqs.csv files produced by the MiSeq pipeline.
# To execute as a script, run python -m micall.monitor.update_qai

import csv
from datetime import datetime
Expand Down

0 comments on commit 64ae651

Please sign in to comment.