Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix splice_impl and tee_impl: Fix readiness and make returned fut Send #28

Merged
merged 13 commits into from
Jun 26, 2022
Merged
145 changes: 123 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,56 @@ unsafe fn set_nonblocking_checked(fd: RawFd, status_flags: libc::c_int) -> Resul
Ok(())
}

/// Return whether (reader is ready, writer is ready).
///
/// Readiness for reader/writer does not just mean readable/writable,
/// they are also considered as ready if they aqre disconnected or an
/// exceptional condition has occured (`libc::POLLERR`).
#[cfg(target_os = "linux")]
unsafe fn test_read_write_readiness(reader: RawFd, writer: RawFd) -> io::Result<(bool, bool)> {
use libc::{poll, pollfd, POLLERR, POLLHUP, POLLIN, POLLNVAL, POLLOUT};

let mut fds = [
pollfd {
fd: reader,
events: POLLIN,
revents: 0,
},
pollfd {
fd: writer,
events: POLLOUT,
revents: 0,
},
];

// Specify timeout to 0 so that it returns immediately.
try_libc!(poll(&mut fds[0], 2, 0));

let is_read_ready = match fds[0].revents {
POLLERR | POLLHUP | POLLIN => true,
POLLNVAL => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fd of reader is invalid",
))
}
_ => false,
};

let is_writer_ready = match fds[1].revents {
POLLERR | POLLHUP | POLLOUT => true,
POLLNVAL => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fd of writer is invalid",
))
}
_ => false,
};

Ok((is_read_ready, is_writer_ready))
}

fn check_pipe(fd: RawFd) -> Result<(), io::Error> {
let mut stat = mem::MaybeUninit::<libc::stat>::uninit();

Expand Down Expand Up @@ -206,17 +256,43 @@ impl<'a, 'b> AtomicWriteIoSlices<'a, 'b> {

#[cfg(target_os = "linux")]
async fn tee_impl(pipe_in: &PipeRead, pipe_out: &PipeWrite, len: usize) -> io::Result<usize> {
let fd_in = pipe_in.0.as_raw_fd();
let fd_out = pipe_out.0.as_raw_fd();

// There is only one reader and one writer, so it only needs to polled once.
let _read_ready = pipe_in.0.readable().await?;
let _write_ready = pipe_out.0.writable().await?;
let mut read_ready = pipe_in.0.readable().await?;
let mut write_ready = pipe_out.0.writable().await?;

loop {
let ret = unsafe { libc::tee(fd_in, fd_out, len, libc::SPLICE_F_NONBLOCK) };
let ret = unsafe {
libc::tee(
pipe_in.as_raw_fd(),
pipe_out.as_raw_fd(),
len,
libc::SPLICE_F_NONBLOCK,
)
};
match cvt!(ret) {
Err(e) if is_wouldblock(&e) => (),
Err(e) if is_wouldblock(&e) => {
// Since tokio might use epoll's edge-triggered mode, we cannot blindly
// clear the readiness, otherwise it would block forever.
//
// So what we do instead is to use test_read_write_readiness, which
// uses poll to test for readiness.
//
// Poll always uses level-triggered mode and it does not require
// any registration at all.
let (read_readiness, write_readiness) = unsafe {
test_read_write_readiness(pipe_in.as_raw_fd(), pipe_out.as_raw_fd())?
};

if !read_readiness {
read_ready.clear_ready();
read_ready = pipe_in.0.readable().await?;
}

if !write_readiness {
write_ready.clear_ready();
write_ready = pipe_out.0.writable().await?;
}
}
Err(e) => break Err(e),
Ok(ret) => break Ok(ret as usize),
}
Expand Down Expand Up @@ -246,34 +322,59 @@ fn as_ptr<T>(option: Option<&mut T>) -> *mut T {

#[cfg(target_os = "linux")]
async fn splice_impl(
asyncfd_in: &mut AsyncFd<impl AsRawFd>,
off_in: Option<&mut off64_t>,
asyncfd_out: &AsyncFd<impl AsRawFd>,
off_out: Option<&mut off64_t>,
fd_in: &mut AsyncFd<impl AsRawFd>,
mut off_in: Option<&mut off64_t>,
fd_out: &AsyncFd<impl AsRawFd>,
mut off_out: Option<&mut off64_t>,
len: usize,
has_more_data: bool,
) -> io::Result<usize> {
let fd_in = asyncfd_in.as_raw_fd();
let fd_out = asyncfd_out.as_raw_fd();

let off_in = as_ptr(off_in);
let off_out = as_ptr(off_out);
// There is only one reader and one writer, so it only needs to polled once.
let mut read_ready = fd_in.readable().await?;
let mut write_ready = fd_out.writable().await?;

// Prepare args for the syscall
let flags = libc::SPLICE_F_NONBLOCK
| if has_more_data {
libc::SPLICE_F_MORE
} else {
0
};

// There is only one reader and one writer, so it only needs to polled once.
let _read_ready = asyncfd_in.readable().await?;
let _write_ready = asyncfd_out.writable().await?;

loop {
let ret = unsafe { libc::splice(fd_in, off_in, fd_out, off_out, len, flags) };
let ret = unsafe {
libc::splice(
fd_in.as_raw_fd(),
as_ptr(off_in.as_deref_mut()),
fd_out.as_raw_fd(),
as_ptr(off_out.as_deref_mut()),
len,
flags,
)
};
match cvt!(ret) {
Err(e) if is_wouldblock(&e) => (),
Err(e) if is_wouldblock(&e) => {
// Since tokio might use epoll's edge-triggered mode, we cannot blindly
// clear the readiness, otherwise it would block forever.
//
// So what we do instead is to use test_read_write_readiness, which
// uses poll to test for readiness.
//
// Poll always uses level-triggered mode and it does not require
// any registration at all.
let (read_readiness, write_readiness) =
unsafe { test_read_write_readiness(fd_in.as_raw_fd(), fd_out.as_raw_fd())? };

if !read_readiness {
read_ready.clear_ready();
read_ready = fd_in.readable().await?;
}

if !write_readiness {
write_ready.clear_ready();
write_ready = fd_out.writable().await?;
}
}
Err(e) => break Err(e),
Ok(ret) => break Ok(ret as usize),
}
Expand Down