Skip to content

Commit

Permalink
Set max_active based on avail mem to close #585.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Jul 27, 2020
1 parent 5bdaecb commit 5a9dee8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 64 deletions.
46 changes: 0 additions & 46 deletions docs/_includes/header.html

This file was deleted.

53 changes: 35 additions & 18 deletions micall_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from glob import glob
import json
import logging
import multiprocessing
from operator import attrgetter
import os
import shutil
Expand Down Expand Up @@ -300,7 +299,7 @@ def check_run_sample_ids(self, run_id_lst, sample_id_lst):
"""


def get_parser():
def get_parser(default_max_active):
# noinspection PyTypeChecker
parser = ArgumentParser(
description=MAIN_DESCRIPTION,
Expand All @@ -309,10 +308,10 @@ def get_parser():
title="Sub-commands (i.e. modes of operation)",
)

commands = [add_folder_parser(subparsers),
commands = [add_folder_parser(subparsers, default_max_active),
add_sample_parser(subparsers),
add_hcv_sample_parser(subparsers),
add_basespace_parser(subparsers)]
add_hcv_sample_parser(subparsers, default_max_active),
add_basespace_parser(subparsers, default_max_active)]
for command_parser in commands:
command_parser.add_argument(
"--all_projects",
Expand All @@ -337,7 +336,17 @@ def get_parser():
return parser


def add_basespace_parser(subparsers):
def get_available_memory():
with open('/proc/meminfo') as meminfo:
for line in meminfo:
label, value, *units = line.split()
if label == 'MemAvailable:':
assert units == ['kB'], units
return int(value) * 1024
raise ValueError('MemAvailable not found in /proc/meminfo.')


def add_basespace_parser(subparsers, default_max_active):
# ####
# BaseSpace mode.
# ####
Expand All @@ -351,14 +360,14 @@ def add_basespace_parser(subparsers):
"--max_active",
"-m",
type=int,
help="Maximum number of samples to process at once, "
"if not the number of CPUs.",
default=default_max_active,
help="Maximum number of samples to process at once.",
)
basespace_parser.set_defaults(func=basespace_run)
return basespace_parser


def add_folder_parser(subparsers):
def add_folder_parser(subparsers, default_max_max_active):
# ####
# The "process a full directory" subcommand.
# ####
Expand Down Expand Up @@ -387,8 +396,8 @@ def add_folder_parser(subparsers):
"--max_active",
"-m",
type=int,
help="Maximum number of samples to process at once, "
"if not the number of CPU's.",
default=default_max_max_active,
help="Maximum number of samples to process at once.",
)
folder_parser.add_argument(
"--fastq1s",
Expand Down Expand Up @@ -453,11 +462,12 @@ def add_sample_parser(subparsers):
"--run_folder.",
default="micall-results",
)
single_sample_parser.set_defaults(func=single_sample)
single_sample_parser.set_defaults(func=single_sample,
max_active=1)
return single_sample_parser


def add_hcv_sample_parser(subparsers):
def add_hcv_sample_parser(subparsers, default_max_active):
# ####
# The "process a single HCV sample" subcommand.
# ####
Expand Down Expand Up @@ -517,7 +527,8 @@ def add_hcv_sample_parser(subparsers):
"--run_folder.",
default="micall-results",
)
hcv_sample_parser.set_defaults(func=hcv_sample)
hcv_sample_parser.set_defaults(func=hcv_sample,
max_active=min(default_max_active, 2))
return hcv_sample_parser


Expand Down Expand Up @@ -576,7 +587,6 @@ def process_run(run_info, args):


def single_sample(args):
args.max_active = 1
resolved_args = MiCallArgs(args)
scratch_path = os.path.join(args.results_folder, "scratch")
makedirs(scratch_path)
Expand All @@ -598,7 +608,6 @@ def single_sample(args):


def hcv_sample(args):
args.max_active = 2
resolved_args = MiCallArgs(args)
midi_args = MiCallArgs(args, map_midi=True)
scratch_path = os.path.join(args.results_folder, "scratch")
Expand Down Expand Up @@ -1102,16 +1111,24 @@ def collate_samples(run_info):

# noinspection PyTypeChecker,PyUnresolvedReferences
def main():
parser = get_parser()
available_memory = get_available_memory()
recommended_memory = 1 << 30 # 1GB
default_max_active = max(1, available_memory // recommended_memory)
cpu_count = len(os.sched_getaffinity(0))
default_max_active = min(default_max_active, cpu_count)

parser = get_parser(default_max_active)
args = parser.parse_args()
if not hasattr(args, "func"):
# No valid subcommand was given; print the help message and exit.
parser.print_help()
return
args.skip = args.skip or ()
if available_memory < recommended_memory:
logger.warning("Available memory is less than 1GB, processing may fail.")
logger.info("Starting on %s with %d CPU's.",
socket.gethostname(),
multiprocessing.cpu_count())
cpu_count)
args.func(args)


Expand Down

0 comments on commit 5a9dee8

Please sign in to comment.