Skip to content

Commit

Permalink
Merge pull request #28 from NobodyXu/fix/splice_impl
Browse files Browse the repository at this point in the history
Fix `splice_impl` and `tee_impl`: Fix readiness and make returned fut `Send`
  • Loading branch information
yskszk63 authored Jun 26, 2022
2 parents ec7414e + 54c92a4 commit 73ffe44
Showing 1 changed file with 123 additions and 22 deletions.
145 changes: 123 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,56 @@ unsafe fn set_nonblocking_checked(fd: RawFd, status_flags: libc::c_int) -> Resul
Ok(())
}

/// Return whether (reader is ready, writer is ready).
///
/// Readiness for reader/writer does not just mean readable/writable,
/// they are also considered as ready if they aqre disconnected or an
/// exceptional condition has occured (`libc::POLLERR`).
#[cfg(target_os = "linux")]
unsafe fn test_read_write_readiness(reader: RawFd, writer: RawFd) -> io::Result<(bool, bool)> {
use libc::{poll, pollfd, POLLERR, POLLHUP, POLLIN, POLLNVAL, POLLOUT};

let mut fds = [
pollfd {
fd: reader,
events: POLLIN,
revents: 0,
},
pollfd {
fd: writer,
events: POLLOUT,
revents: 0,
},
];

// Specify timeout to 0 so that it returns immediately.
try_libc!(poll(&mut fds[0], 2, 0));

let is_read_ready = match fds[0].revents {
POLLERR | POLLHUP | POLLIN => true,
POLLNVAL => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fd of reader is invalid",
))
}
_ => false,
};

let is_writer_ready = match fds[1].revents {
POLLERR | POLLHUP | POLLOUT => true,
POLLNVAL => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fd of writer is invalid",
))
}
_ => false,
};

Ok((is_read_ready, is_writer_ready))
}

fn check_pipe(fd: RawFd) -> Result<(), io::Error> {
let mut stat = mem::MaybeUninit::<libc::stat>::uninit();

Expand Down Expand Up @@ -206,17 +256,43 @@ impl<'a, 'b> AtomicWriteIoSlices<'a, 'b> {

#[cfg(target_os = "linux")]
async fn tee_impl(pipe_in: &PipeRead, pipe_out: &PipeWrite, len: usize) -> io::Result<usize> {
let fd_in = pipe_in.0.as_raw_fd();
let fd_out = pipe_out.0.as_raw_fd();

// There is only one reader and one writer, so it only needs to polled once.
let _read_ready = pipe_in.0.readable().await?;
let _write_ready = pipe_out.0.writable().await?;
let mut read_ready = pipe_in.0.readable().await?;
let mut write_ready = pipe_out.0.writable().await?;

loop {
let ret = unsafe { libc::tee(fd_in, fd_out, len, libc::SPLICE_F_NONBLOCK) };
let ret = unsafe {
libc::tee(
pipe_in.as_raw_fd(),
pipe_out.as_raw_fd(),
len,
libc::SPLICE_F_NONBLOCK,
)
};
match cvt!(ret) {
Err(e) if is_wouldblock(&e) => (),
Err(e) if is_wouldblock(&e) => {
// Since tokio might use epoll's edge-triggered mode, we cannot blindly
// clear the readiness, otherwise it would block forever.
//
// So what we do instead is to use test_read_write_readiness, which
// uses poll to test for readiness.
//
// Poll always uses level-triggered mode and it does not require
// any registration at all.
let (read_readiness, write_readiness) = unsafe {
test_read_write_readiness(pipe_in.as_raw_fd(), pipe_out.as_raw_fd())?
};

if !read_readiness {
read_ready.clear_ready();
read_ready = pipe_in.0.readable().await?;
}

if !write_readiness {
write_ready.clear_ready();
write_ready = pipe_out.0.writable().await?;
}
}
Err(e) => break Err(e),
Ok(ret) => break Ok(ret as usize),
}
Expand Down Expand Up @@ -246,34 +322,59 @@ fn as_ptr<T>(option: Option<&mut T>) -> *mut T {

#[cfg(target_os = "linux")]
async fn splice_impl(
asyncfd_in: &mut AsyncFd<impl AsRawFd>,
off_in: Option<&mut off64_t>,
asyncfd_out: &AsyncFd<impl AsRawFd>,
off_out: Option<&mut off64_t>,
fd_in: &mut AsyncFd<impl AsRawFd>,
mut off_in: Option<&mut off64_t>,
fd_out: &AsyncFd<impl AsRawFd>,
mut off_out: Option<&mut off64_t>,
len: usize,
has_more_data: bool,
) -> io::Result<usize> {
let fd_in = asyncfd_in.as_raw_fd();
let fd_out = asyncfd_out.as_raw_fd();

let off_in = as_ptr(off_in);
let off_out = as_ptr(off_out);
// There is only one reader and one writer, so it only needs to polled once.
let mut read_ready = fd_in.readable().await?;
let mut write_ready = fd_out.writable().await?;

// Prepare args for the syscall
let flags = libc::SPLICE_F_NONBLOCK
| if has_more_data {
libc::SPLICE_F_MORE
} else {
0
};

// There is only one reader and one writer, so it only needs to polled once.
let _read_ready = asyncfd_in.readable().await?;
let _write_ready = asyncfd_out.writable().await?;

loop {
let ret = unsafe { libc::splice(fd_in, off_in, fd_out, off_out, len, flags) };
let ret = unsafe {
libc::splice(
fd_in.as_raw_fd(),
as_ptr(off_in.as_deref_mut()),
fd_out.as_raw_fd(),
as_ptr(off_out.as_deref_mut()),
len,
flags,
)
};
match cvt!(ret) {
Err(e) if is_wouldblock(&e) => (),
Err(e) if is_wouldblock(&e) => {
// Since tokio might use epoll's edge-triggered mode, we cannot blindly
// clear the readiness, otherwise it would block forever.
//
// So what we do instead is to use test_read_write_readiness, which
// uses poll to test for readiness.
//
// Poll always uses level-triggered mode and it does not require
// any registration at all.
let (read_readiness, write_readiness) =
unsafe { test_read_write_readiness(fd_in.as_raw_fd(), fd_out.as_raw_fd())? };

if !read_readiness {
read_ready.clear_ready();
read_ready = fd_in.readable().await?;
}

if !write_readiness {
write_ready.clear_ready();
write_ready = fd_out.writable().await?;
}
}
Err(e) => break Err(e),
Ok(ret) => break Ok(ret as usize),
}
Expand Down

0 comments on commit 73ffe44

Please sign in to comment.