Skip to content

Commit

Permalink
Use mappy to do affine-gap alignment
Browse files Browse the repository at this point in the history
  • Loading branch information
rrwick committed Jun 17, 2022
1 parent de62102 commit b053f3a
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions scripts/polypolish_human_readable.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import datetime
import edlib
import gzip
import mappy
import os
import re
import pathlib
Expand All @@ -32,7 +33,7 @@ def main():
check_inputs(args)
start_time = starting_message(args)
before, after = load_assemblies(args.before, args.after)
align_sequences(before, after, args.padding, args.merge)
align_sequences(before, after, args.padding, args.merge, args.aligner)
finished_message(start_time)


Expand All @@ -53,6 +54,8 @@ def parse_args():
help='Bases of additional sequence to show before/after each change')
setting_args.add_argument('--merge', type=int, default=30,
help='Changes this close are merged together in the output')
setting_args.add_argument('--aligner', type=str, choices=['mappy', 'edlib'], default='mappy',
help='Aligner library: mappy has affine-gap, edlib is more robust')

help_args = parser.add_argument_group('Help')
help_args.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS,
Expand Down Expand Up @@ -124,14 +127,14 @@ def load_assemblies(before_filename, after_filename):
return before, after


def align_sequences(before, after, padding, merge):
def align_sequences(before, after, padding, merge, aligner):
section_header('Aligning sequences')
longest_label = get_longest_label(before, after)
for b, a in zip(before, after):
before_name, before_seq = b
after_name, after_seq = a
output_differences(before_name, before_seq, after_name, after_seq, padding, merge,
longest_label)
longest_label, aligner)


def get_longest_label(before, after):
Expand All @@ -143,10 +146,10 @@ def get_longest_label(before, after):


def output_differences(before_name, before_seq, after_name, after_seq, padding, merge,
longest_label):
longest_label, aligner):
log(f'Aligning {before_name} to {after_name}:')
before_aligned, after_aligned, differences, before_pos, after_pos, diff_pos = \
get_aligned_seqs(before_seq, after_seq)
get_aligned_seqs(before_seq, after_seq, aligner)
log(f' {len(diff_pos):,} differences')

aligned_len = len(before_aligned)
Expand Down Expand Up @@ -194,17 +197,26 @@ def make_diff_ranges(diff_pos, padding, merge, aligned_len):
return diff_ranges


def get_aligned_seqs(before_seq, after_seq):
result = edlib.align(before_seq, after_seq, mode='NW', task='path')
cigar = result['cigar']
def get_aligned_seqs(before_seq, after_seq, aligner):
cigar = get_cigar(before_seq, after_seq, aligner)
expanded_cigar = get_expanded_cigar(cigar)
i, j = 0, 0
before_aligned, after_aligned, differences = [], [], []
before_positions, after_positions, diff_positions = [], [], []
for c in expanded_cigar:
before_positions.append(i)
after_positions.append(j)
if c == '=':
if c == 'M':
b_1 = before_seq[i]
b_2 = after_seq[j]
if b_1 == b_2:
diff = ' '
else:
diff = '*'
diff_positions.append(len(differences))
i += 1
j += 1
elif c == '=':
b_1 = before_seq[i]
b_2 = after_seq[j]
diff = ' '
Expand Down Expand Up @@ -249,9 +261,33 @@ def get_aligned_seqs(before_seq, after_seq):
before_positions, after_positions, diff_positions


def get_cigar(before_seq, after_seq, aligner):
if aligner == 'mappy':
return get_cigar_with_mappy(before_seq, after_seq)
elif aligner == 'edlib':
return get_cigar_with_edlib(before_seq, after_seq)
else:
assert False


def get_cigar_with_mappy(before_seq, after_seq):
a = mappy.Aligner(seq=after_seq, preset='asm20')
for result in a.map(before_seq):
full_length_query = (result.q_st == 0 and result.q_en == len(before_seq))
full_length_ref = (result.r_st == 0 and result.r_en == len(after_seq))
if full_length_query and full_length_ref:
return result.cigar_str
quit_with_error('Error: mappy alignment failed, try using --aligner edlib')


def get_cigar_with_edlib(before_seq, after_seq):
result = edlib.align(before_seq, after_seq, mode='NW', task='path')
return result['cigar']


def get_expanded_cigar(cigar):
expanded_cigar = []
cigar_parts = re.findall(r'\d+[IDX=]', cigar)
cigar_parts = re.findall(r'\d+[IDX=M]', cigar)
for p in cigar_parts:
size = int(p[:-1])
letter = p[-1]
Expand Down

0 comments on commit b053f3a

Please sign in to comment.