From 90050458545cc14e3e27b7dac220c153c07c693e Mon Sep 17 00:00:00 2001 From: don Date: Wed, 12 Jul 2017 11:40:41 -0700 Subject: [PATCH] Stop sorting sam2aln to reduce memory usage for #393. --- micall/core/sam2aln.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/micall/core/sam2aln.py b/micall/core/sam2aln.py index 7f9c267ec..380e8c501 100755 --- a/micall/core/sam2aln.py +++ b/micall/core/sam2aln.py @@ -14,6 +14,8 @@ import re from multiprocessing.pool import Pool +from micall.utils.big_counter import BigCounter + SAM2ALN_Q_CUTOFFS = [15] # Q-cutoff for base censoring MAX_PROP_N = 0.5 # Drop reads with more censored bases than this proportion SAM_FLAG_IS_FIRST_SEGMENT = 0x40 @@ -434,6 +436,19 @@ def parse_sam_in_threads(remap_csv, nthreads, pair_processor): pool.join() +class CounterFactory: + def __init__(self, aligned_file): + self.aligned_file_name = getattr(aligned_file, 'name', None) + + def create_counter(self): + if self.aligned_file_name is None: + return Counter() + dirname = os.path.dirname(self.aligned_file_name) + file_prefix = os.path.join(os.path.abspath(dirname), + 'merged_seq_counts') + return BigCounter(file_prefix=file_prefix) + + def sam2aln(remap_csv, aligned_csv, insert_csv=None, @@ -468,7 +483,8 @@ def sam2aln(remap_csv, clipping_writer.writeheader() pair_processor = PairProcessor(is_clipping=clipping_csv is not None) - empty_region = defaultdict(Counter) + counter_factory = CounterFactory(aligned_csv) + empty_region = defaultdict(counter_factory.create_counter) aligned = defaultdict(empty_region.copy) # {rname: {qcut: {mseq: count}}} if nthreads: regions = parse_sam_in_threads(remap_csv, nthreads, pair_processor) @@ -477,7 +493,8 @@ def sam2aln(remap_csv, regions = map(pair_processor.process, matchmaker(remap_csv)) for rname, mseqs, insert_list, failed_list in regions: - region = aligned[rname] + # noinspection PyTypeChecker + region = aligned[str(rname)] # str() works around a PyCharm bug. for qcut, mseq in mseqs.items(): # collect identical merged sequences @@ -500,17 +517,14 @@ def sam2aln(remap_csv, aligned_writer = DictWriter(aligned_csv, aligned_fields, lineterminator=os.linesep) aligned_writer.writeheader() for rname in sorted(aligned.keys()): - data = aligned[rname] - for qcut, data2 in data.items(): - # sort variants by count - intermed = [(count, len_gap_prefix(s), s) for s, count in data2.items()] - intermed.sort(reverse=True) - for rank, (count, offset, seq) in enumerate(intermed): + region = aligned[rname] + for qcut, mseq_counter in region.items(): + for rank, (seq, count) in enumerate(mseq_counter.items()): aligned_writer.writerow(dict(refname=rname, qcut=qcut, rank=rank, count=count, - offset=offset, + offset=len_gap_prefix(seq), seq=seq.strip('-')))