Skip to content

Commit

Permalink
bgzf/multithreaded_reader: Recycle buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Aug 28, 2023
1 parent 974c523 commit 1c802ea
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 34 deletions.
78 changes: 49 additions & 29 deletions noodles-bgzf/src/multithreaded_reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io::{self, BufRead, Read},
mem,
num::NonZeroUsize,
thread::{self, JoinHandle},
};
Expand All @@ -8,12 +9,20 @@ use crossbeam_channel::{Receiver, Sender};

use crate::Block;

type BufferedTx = Sender<io::Result<Block>>;
type BufferedRx = Receiver<io::Result<Block>>;
type InflateTx = Sender<(Vec<u8>, BufferedTx)>;
type InflateRx = Receiver<(Vec<u8>, BufferedTx)>;
type BufferedTx = Sender<io::Result<Buffer>>;
type BufferedRx = Receiver<io::Result<Buffer>>;
type InflateTx = Sender<(Buffer, BufferedTx)>;
type InflateRx = Receiver<(Buffer, BufferedTx)>;
type ReadTx = Sender<BufferedRx>;
type ReadRx = Receiver<BufferedRx>;
type RecycleTx = Sender<Buffer>;
type RecycleRx = Receiver<Buffer>;

#[derive(Debug, Default)]
struct Buffer {
buf: Vec<u8>,
block: Block,
}

/// A multithreaded BGZF reader.
///
Expand All @@ -25,8 +34,9 @@ pub struct MultithreadedReader {
reader_handle: Option<JoinHandle<io::Result<()>>>,
inflater_handles: Vec<JoinHandle<()>>,
read_rx: ReadRx,
recycle_tx: Option<RecycleTx>,
position: u64,
block: Block,
buffer: Buffer,
}

impl MultithreadedReader {
Expand All @@ -37,21 +47,29 @@ impl MultithreadedReader {
{
let (inflate_tx, inflate_rx) = crossbeam_channel::bounded(worker_count.get());
let (read_tx, read_rx) = crossbeam_channel::bounded(worker_count.get());
let (recycle_tx, recycle_rx) = crossbeam_channel::bounded(worker_count.get());

let reader_handle = spawn_reader(inner, inflate_tx, read_tx);
for _ in 0..worker_count.get() {
recycle_tx.send(Buffer::default()).unwrap();
}

let reader_handle = spawn_reader(inner, inflate_tx, read_tx, recycle_rx);
let inflater_handles = spawn_inflaters(worker_count, inflate_rx);

Self {
reader_handle: Some(reader_handle),
inflater_handles,
read_rx,
recycle_tx: Some(recycle_tx),
position: 0,
block: Block::default(),
buffer: Buffer::default(),
}
}

/// Shuts down the reader and inflate workers.
pub fn finish(&mut self) -> io::Result<()> {
self.recycle_tx.take();

for handle in self.inflater_handles.drain(..) {
handle.join().unwrap();
}
Expand All @@ -63,23 +81,25 @@ impl MultithreadedReader {
Ok(())
}

fn next_block(&mut self) -> io::Result<Option<Block>> {
fn recv_buffer(&mut self) -> io::Result<Option<Buffer>> {
if let Ok(buffered_rx) = self.read_rx.recv() {
if let Ok(block) = buffered_rx.recv() {
return block.map(Some);
if let Ok(buffer) = buffered_rx.recv() {
return buffer.map(Some);
}
}

Ok(None)
}

fn read_block(&mut self) -> io::Result<()> {
while let Some(mut block) = self.next_block()? {
block.set_position(self.position);
self.position += block.size();
self.block = block;
while let Some(mut buffer) = self.recv_buffer()? {
buffer.block.set_position(self.position);
self.position += buffer.block.size();

if self.block.data().len() > 0 {
let prev_buffer = mem::replace(&mut self.buffer, buffer);
self.recycle_tx.as_ref().unwrap().send(prev_buffer).ok();

if self.buffer.block.data().len() > 0 {
break;
}
}
Expand All @@ -105,55 +125,55 @@ impl Read for MultithreadedReader {

impl BufRead for MultithreadedReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if !self.block.data().has_remaining() {
if !self.buffer.block.data().has_remaining() {
self.read_block()?;
}

Ok(self.block.data().as_ref())
Ok(self.buffer.block.data().as_ref())
}

fn consume(&mut self, amt: usize) {
self.block.data_mut().consume(amt);
self.buffer.block.data_mut().consume(amt);
}
}

fn spawn_reader<R>(
mut reader: R,
inflate_tx: InflateTx,
read_tx: ReadTx,
recycle_rx: RecycleRx,
) -> JoinHandle<io::Result<()>>
where
R: Read + Send + 'static,
{
use super::reader::block::read_frame;
use super::reader::block::read_frame_into;

thread::spawn(move || {
while let Some(buf) = read_frame(&mut reader)? {
let (buffered_tx, buffered_rx) = crossbeam_channel::bounded(1);

if inflate_tx.send((buf, buffered_tx)).is_err() {
while let Ok(mut buffer) = recycle_rx.recv() {
if read_frame_into(&mut reader, &mut buffer.buf)?.is_none() {
break;
}

if read_tx.send(buffered_rx).is_err() {
break;
}
let (buffered_tx, buffered_rx) = crossbeam_channel::bounded(1);

inflate_tx.send((buffer, buffered_tx)).unwrap();
read_tx.send(buffered_rx).unwrap();
}

Ok(())
})
}

fn spawn_inflaters(worker_count: NonZeroUsize, inflate_rx: InflateRx) -> Vec<JoinHandle<()>> {
use super::reader::block::parse_frame;
use super::reader::block::parse_frame_into;

(0..worker_count.get())
.map(|_| {
let inflate_rx = inflate_rx.clone();

thread::spawn(move || {
while let Ok((src, buffered_tx)) = inflate_rx.recv() {
let result = parse_frame(&src);
while let Ok((mut buffer, buffered_tx)) = inflate_rx.recv() {
let result = parse_frame_into(&buffer.buf, &mut buffer.block).map(|_| buffer);
buffered_tx.send(result).unwrap();
}
})
Expand Down
14 changes: 9 additions & 5 deletions noodles-bgzf/src/reader/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
}
}

fn read_frame_into<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>>
pub(crate) fn read_frame_into<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>>
where
R: Read,
{
Expand Down Expand Up @@ -160,14 +160,18 @@ where
Ok((crc32, r#isize))
}

pub(crate) fn parse_frame(src: &[u8]) -> io::Result<Block> {
pub fn parse_frame(src: &[u8]) -> io::Result<Block> {
let mut block = Block::default();
parse_frame_into(src, &mut block)?;
Ok(block)
}

pub(crate) fn parse_frame_into(src: &[u8], block: &mut Block) -> io::Result<()> {
let (header, cdata, trailer) = split_frame(src);

parse_header(header)?;
let (crc32, r#isize) = parse_trailer(trailer)?;

let mut block = Block::default();

let block_size =
u64::try_from(src.len()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
block.set_size(block_size);
Expand All @@ -178,7 +182,7 @@ pub(crate) fn parse_frame(src: &[u8]) -> io::Result<Block> {

inflate(cdata, crc32, data.as_mut())?;

Ok(block)
Ok(())
}

fn inflate(src: &[u8], crc32: u32, dst: &mut [u8]) -> io::Result<()> {
Expand Down

0 comments on commit 1c802ea

Please sign in to comment.