Skip to content

Commit

Permalink
Merge pull request #168 from ksahlin/threads
Browse files Browse the repository at this point in the history
Output mapped reads in the order that they had in the input file and decrease default chunk size
  • Loading branch information
marcelm committed Dec 2, 2022
2 parents b113236 + b6575b9 commit 8d3ef7e
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 146 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Bug fixes

* Issue #121: When more than one thread is used, changed behavior so that
alignments are output in the order that the reads had in the input file.
* PR #114: Fix read-length estimate on input files with few reads.
* PR #78: Fixed incorrect template length (TLEN) sign.
* Issue #56: Unmapped reads did not have quality values in the SAM output.
Expand Down
2 changes: 1 addition & 1 deletion src/aln.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ static inline void rescue_mate(
sam_aln.ref_start = ref_start + info.ref_offset +1; // +1 because SAM is 1-based!
sam_aln.is_rc = a_is_rc;
sam_aln.ref_id = n.ref_id;
sam_aln.is_unaligned = false;
sam_aln.is_unaligned = info.cigar == "*";
sam_aln.aln_length = info.length;
tot_ksw_aligned ++;
tot_rescued ++;
Expand Down
5 changes: 5 additions & 0 deletions src/cmdline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ std::pair<CommandLineOptions, mapping_params> parse_command_line_arguments(int a

args::HelpFlag help(parser, "help", "Print help and exit", {'h', "help"});
args::ActionFlag version(parser, "version", "Print version and exit", {"version"}, []() { throw Version(); });

// Threading
args::ValueFlag<int> threads(parser, "INT", "Number of threads [3]", {'t', "threads"});
args::ValueFlag<int> chunk_size(parser, "INT", "Number of reads processed by a worker thread at once [10000]", {"chunk-size"}, args::Options::Hidden);

args::Group io(parser, "Input/output:");
args::ValueFlag<std::string> o(parser, "PATH", "redirect output to file [stdout]", {'o'});
Expand Down Expand Up @@ -84,7 +87,9 @@ std::pair<CommandLineOptions, mapping_params> parse_command_line_arguments(int a
CommandLineOptions opt;
mapping_params map_param;

// Threading
if (threads) { opt.n_threads = args::get(threads); }
if (chunk_size) { opt.chunk_size = args::get(chunk_size); }

// Input/output
if (o) { opt.output_file_name = args::get(o); opt.write_to_stdout = false; }
Expand Down
1 change: 1 addition & 0 deletions src/cmdline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

struct CommandLineOptions {
int n_threads { 3 };
int chunk_size { 10000 };

// Input/output
std::string output_file_name;
Expand Down
15 changes: 6 additions & 9 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,20 @@ int run_strobealign(int argc, char **argv) {
out << sam_header(references, opt.read_group_id, opt.read_group_fields);
}

std::unordered_map<std::thread::id, AlignmentStatistics> log_stats_vec(opt.n_threads);
std::vector<AlignmentStatistics> log_stats_vec(opt.n_threads);

logger.info() << "Running in " << (opt.is_SE ? "single-end" : "paired-end") << " mode" << std::endl;

if (opt.is_SE) {
auto ks = open_fastq(opt.reads_filename1);

int input_chunk_size = 100000;
InputBuffer input_buffer(ks, ks, input_chunk_size);
InputBuffer input_buffer(ks, ks, opt.chunk_size);
OutputBuffer output_buffer(out);

std::vector<std::thread> workers;
for (int i = 0; i < opt.n_threads; ++i) {
std::thread consumer(perform_task_SE, std::ref(input_buffer), std::ref(output_buffer),
std::ref(log_stats_vec), std::ref(aln_params),
std::ref(log_stats_vec[i]), std::ref(aln_params),
std::ref(map_param), std::ref(index_parameters), std::ref(references),
std::ref(index), std::ref(opt.read_group_id));
workers.push_back(std::move(consumer));
Expand All @@ -290,16 +289,14 @@ int run_strobealign(int argc, char **argv) {
else {
auto ks1 = open_fastq(opt.reads_filename1);
auto ks2 = open_fastq(opt.reads_filename2);
std::unordered_map<std::thread::id, i_dist_est> isize_est_vec(opt.n_threads);

int input_chunk_size = 100000;
InputBuffer input_buffer(ks1, ks2, input_chunk_size);
InputBuffer input_buffer(ks1, ks2, opt.chunk_size);
OutputBuffer output_buffer(out);

std::vector<std::thread> workers;
for (int i = 0; i < opt.n_threads; ++i) {
std::thread consumer(perform_task_PE, std::ref(input_buffer), std::ref(output_buffer),
std::ref(log_stats_vec), std::ref(isize_est_vec), std::ref(aln_params),
std::ref(log_stats_vec[i]), std::ref(aln_params),
std::ref(map_param), std::ref(index_parameters), std::ref(references),
std::ref(index), std::ref(opt.read_group_id));
workers.push_back(std::move(consumer));
Expand All @@ -313,7 +310,7 @@ int run_strobealign(int argc, char **argv) {

AlignmentStatistics tot_statistics;
for (auto& it : log_stats_vec) {
tot_statistics += it.second;
tot_statistics += it;
}

logger.info() << "Total mapping sites tried: " << tot_statistics.tot_all_tried << std::endl
Expand Down
175 changes: 59 additions & 116 deletions src/pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Using initial base format of Buffer classed from: https://andrew128.github.io/ProducerConsumer/

#include "pc.hpp"
#include <thread>
#include <mutex>
#include <iostream>
#include <chrono>
#include <queue>
Expand All @@ -16,98 +16,65 @@
#include "kseq++.hpp"
#include "sam.hpp"

using namespace klibpp;


void InputBuffer::read_records_PE(std::vector<KSeq> &records1, std::vector<KSeq> &records2, AlignmentStatistics &statistics) {
size_t InputBuffer::read_records_PE(std::vector<klibpp::KSeq> &records1, std::vector<klibpp::KSeq> &records2, AlignmentStatistics &statistics) {
Timer timer;
// Acquire a unique lock on the mutex
std::unique_lock<std::mutex> unique_lock(mtx);
records1 = ks1.read(chunk_size);
records2 = ks2.read(chunk_size);
size_t current_chunk_index = chunk_index;
chunk_index++;

if (records1.empty()){
if (records1.empty()) {
finished_reading = true;
}

// Unlock unique lock
unique_lock.unlock();
// Notify a single thread that buffer isn't empty
not_empty.notify_one();
statistics.tot_read_file += timer.duration();

return current_chunk_index;
}

void InputBuffer::read_records_SE(std::vector<KSeq> &records1, AlignmentStatistics &statistics) {
size_t InputBuffer::read_records_SE(std::vector<klibpp::KSeq> &records1, AlignmentStatistics &statistics) {
Timer timer;
// Acquire a unique lock on the mutex
std::unique_lock<std::mutex> unique_lock(mtx);
records1 = ks1.read(chunk_size);
size_t current_chunk_index = chunk_index;
chunk_index++;

if (records1.empty()){
finished_reading = true;
}

// Unlock unique lock
unique_lock.unlock();
// Notify a single thread that buffer isn't empty
not_empty.notify_one();
statistics.tot_read_file += timer.duration();
}

void OutputBuffer::output_records(std::string &sam_alignments) {
// Acquire a unique lock on the mutex
std::unique_lock<std::mutex> unique_lock(mtx);
out << sam_alignments;
// Update appropriate fields
buffer_size = 0;
// Unlock unique lock
unique_lock.unlock();
not_full.notify_one();
return current_chunk_index;
}

void OutputBuffer::output_records(std::string chunk, size_t chunk_index) {
std::unique_lock<std::mutex> unique_lock(mtx);

inline bool align_reads_PE(
InputBuffer &input_buffer,
OutputBuffer &output_buffer,
std::vector<KSeq> &records1,
std::vector<KSeq> &records2,
AlignmentStatistics &statistics,
i_dist_est &isize_est,
const alignment_params &aln_params,
const mapping_params &map_param,
const IndexParameters& index_parameters,
const References& references,
const StrobemerIndex& index,
const std::string& read_group_id
) {
// If no more reads to align
if (records1.empty() && input_buffer.finished_reading){
return true;
}

std::string sam_out;
sam_out.reserve(7*map_param.r *records1.size());
Sam sam{sam_out, references, read_group_id};
for (size_t i = 0; i < records1.size(); ++i) {
auto record1 = records1[i];
auto record2 = records2[i];

align_PE_read(record1, record2, sam, sam_out, statistics, isize_est, aln_params,
map_param, index_parameters, references, index);
// Ensure we print the chunks in the order in which they were read
assert(chunks.count(chunk_index) == 0);
chunks.emplace(std::make_pair(chunk_index, chunk));
while (true) {
const auto& item = chunks.find(next_chunk_index);
if (item == chunks.end()) {
break;
}
out << item->second;
next_chunk_index++;
}
// std::cerr << isize_est_vec[thread_id].mu << " " << isize_est_vec[thread_id].sigma << "\n";
// std::cerr << log_stats_vec[thread_id].tot_all_tried << " " << log_stats_vec[thread_id].tot_ksw_aligned << "\n";
// IMMEDIATELY PRINT TO STDOUT/FILE HERE
output_buffer.output_records(sam_out);
return false;
unique_lock.unlock();
}


void perform_task_PE(
InputBuffer &input_buffer,
OutputBuffer &output_buffer,
std::unordered_map<std::thread::id, AlignmentStatistics> &log_stats_vec,
std::unordered_map<std::thread::id, i_dist_est> &isize_est_vec,
AlignmentStatistics& statistics,
const alignment_params &aln_params,
const mapping_params &map_param,
const IndexParameters& index_parameters,
Expand All @@ -116,62 +83,35 @@ void perform_task_PE(
const std::string& read_group_id
) {
bool eof = false;
while (true){
std::vector<KSeq> records1;
std::vector<KSeq> records2;
auto thread_id = std::this_thread::get_id();
if (log_stats_vec.find(thread_id) == log_stats_vec.end()) { // Not initialized
AlignmentStatistics statistics;
log_stats_vec[thread_id] = statistics;
}
input_buffer.read_records_PE(records1, records2, log_stats_vec[thread_id]);
eof = align_reads_PE(input_buffer, output_buffer, records1, records2,
log_stats_vec[thread_id], isize_est_vec[thread_id],
aln_params, map_param, index_parameters, references, index, read_group_id);

if (eof) {
while (!eof) {
std::vector<klibpp::KSeq> records1;
std::vector<klibpp::KSeq> records2;
auto chunk_index = input_buffer.read_records_PE(records1, records2, statistics);
i_dist_est isize_est;
if (records1.empty() && input_buffer.finished_reading){
break;
}
}
}

std::string sam_out;
sam_out.reserve(7*map_param.r *records1.size());
Sam sam{sam_out, references, read_group_id};
for (size_t i = 0; i < records1.size(); ++i) {
auto record1 = records1[i];
auto record2 = records2[i];

inline bool align_reads_SE(
InputBuffer &input_buffer,
OutputBuffer &output_buffer,
std::vector<KSeq> &records,
AlignmentStatistics &statistics,
const alignment_params &aln_params,
const mapping_params &map_param,
const IndexParameters& index_parameters,
const References& references,
const StrobemerIndex& index,
const std::string& read_group_id
) {
// If no more reads to align
if (records.empty() && input_buffer.finished_reading){
return true;
}

std::string sam_out;
sam_out.reserve(7*map_param.r *records.size());
Sam sam{sam_out, references, read_group_id};
for (size_t i = 0; i < records.size(); ++i) {
auto record1 = records[i];

align_SE_read(record1, sam, sam_out, statistics, aln_params, map_param, index_parameters, references, index);
align_PE_read(record1, record2, sam, sam_out, statistics, isize_est, aln_params,
map_param, index_parameters, references, index);
}
output_buffer.output_records(std::move(sam_out), chunk_index);
assert(sam_out == "");
}
// std::cerr << isize_est_vec[thread_id].mu << " " << isize_est_vec[thread_id].sigma << "\n";
// std::cerr << log_stats_vec[thread_id].tot_all_tried << " " << log_stats_vec[thread_id].tot_ksw_aligned << "\n";
// IMMEDIATELY PRINT TO STDOUT/FILE HERE
output_buffer.output_records(sam_out);
return false;
}


void perform_task_SE(
InputBuffer &input_buffer,
OutputBuffer &output_buffer,
std::unordered_map<std::thread::id, AlignmentStatistics> &log_stats_vec,
AlignmentStatistics& statistics,
const alignment_params &aln_params,
const mapping_params &map_param,
const IndexParameters& index_parameters,
Expand All @@ -180,20 +120,23 @@ void perform_task_SE(
const std::string& read_group_id
) {
bool eof = false;
while (true){
std::vector<KSeq> records1;
auto thread_id = std::this_thread::get_id();
if (log_stats_vec.find(thread_id) == log_stats_vec.end()) { // Not initialized
AlignmentStatistics statistics;
log_stats_vec[thread_id] = statistics;
}
input_buffer.read_records_SE(records1, log_stats_vec[thread_id]);
eof = align_reads_SE(input_buffer, output_buffer, records1,
log_stats_vec[thread_id],
aln_params, map_param, index_parameters, references, index, read_group_id);
while (!eof){
std::vector<klibpp::KSeq> records;
auto chunk_index = input_buffer.read_records_SE(records, statistics);

if (eof){
if (records.empty() && input_buffer.finished_reading){
break;
}

std::string sam_out;
sam_out.reserve(7*map_param.r *records.size());
Sam sam{sam_out, references, read_group_id};
for (size_t i = 0; i < records.size(); ++i) {
auto record = records[i];

align_SE_read(record, sam, sam_out, statistics, aln_params, map_param, index_parameters, references, index);
}
output_buffer.output_records(std::move(sam_out), chunk_index);
assert(sam_out == "");
}
}
Loading

0 comments on commit 8d3ef7e

Please sign in to comment.