Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splice between a pipe and a socket #26

Closed
pkolaczk opened this issue Jun 20, 2022 · 43 comments · Fixed by #28
Closed

Splice between a pipe and a socket #26

pkolaczk opened this issue Jun 20, 2022 · 43 comments · Fixed by #28

Comments

@pkolaczk
Copy link

I can see you can zero-copy-move data between the pipes.
Is there a way to move the data from a socket to a pipe and from a pipe to a socket by using splice/tee?

@NobodyXu
Copy link
Contributor

According to the man page of [splice]:

It transfers up to len bytes of data from the file descriptor fd_in to the file descriptor fd_out, where one of the file descriptors must refer to a pipe.

You can use splice to move data from pipe into socket. or vise versa.

@NobodyXu
Copy link
Contributor

There're two functions for this:

@pkolaczk
Copy link
Author

Ok, but how to make them work with OwnedReadHalf and OwnedWriteHalf of the TpcStream?
Looks like stream halves don't implement to AsRawFd :(

@NobodyXu
Copy link
Contributor

Checkout impl AsRef for OwnedReadHalf and impl AsRef for OwnedWriteHalf, which enables you to get TcpStream from OwnedReadHalf and OwnedWriteHalf respectively.

Then, you can just use impl AsRawFd for TcpStream.

@pkolaczk
Copy link
Author

Ok, makes sense. Thank you!

@pkolaczk pkolaczk reopened this Jun 20, 2022
@pkolaczk
Copy link
Author

Tried that, but I'm hitting a problem:

async fn forward(src: OwnedReadHalf, dest: OwnedWriteHalf) -> io::Result<u64> {
    use tokio_pipe::{pipe, PipeRead, PipeWrite};
    let (mut pipe_read, mut pipe_write) = pipe()?;
    let mut in_fd = src.as_ref().as_raw_fd();
    let mut out_fd = dest.as_ref().as_raw_fd();
    let mut total_count = 0;
    loop {
        let count = pipe_write.splice_from(&mut AsyncFd::new(in_fd)?, None, 4096).await?;
        if count == 0 {
            break;
        }
        pipe_read.splice_to(&mut AsyncFd::new(out_fd)?, None, count, true).await?;
    }
    Ok(total_count)
error: generator cannot be sent between threads safely
   --> src/main.rs:187:9
    |
187 |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = Result<(), ()>>`, the trait `Send` is not implemented for `*mut i64`
note: required by a bound in `tokio::spawn`
   --> /home/pkolaczk/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

If I remove the inner body of the loop (splice_to and splice_from calls), it compiles fine.
I guess this has something to do with those AsyncFd references, although I'm quite stumped, because the docs say it is Sync and Send.

@NobodyXu
Copy link
Contributor

Aha I got it.

That's because splice_impl contains raw pointer.
I will submit a PR to fix this.

@NobodyXu
Copy link
Contributor

@pkolaczk Can you try the PR linked with this issue to see if it fixes the issue for you?

@pkolaczk
Copy link
Author

Yeah, checking....

@pkolaczk
Copy link
Author

Looks good! Thank you!

@pkolaczk
Copy link
Author

pkolaczk commented Jun 20, 2022

Huh, but the idea still doesn't work:

let mut in_fd = AsyncFd::new(src.as_ref().as_raw_fd()).expect("in_fd");

This fails with:

thread 'tokio-runtime-worker' panicked at 'in_fd: Os { code: 17, kind: AlreadyExists, message: "File exists" }', src/main.rs:102:60
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I also tried without getting the raw_fd:

let mut in_fd = AsyncFd::new(src.as_ref()).expect("in_fd");

But that doesn't even compile:

the trait `AsRawFd` is not implemented for `&tokio::net::TcpStream`

How to get an AsRawFd from &tokio::net::TcpStream? I cannot obtain a tokio::net::TcpStream here (which would be AsRawFd), because I have only one half of it.

@NobodyXu
Copy link
Contributor

Ok, I got it.

The problem is that you cannot register the same fd twice with AsyncFd (TcpStream internally registers it in the same way).

@NobodyXu
Copy link
Contributor

The fundamental problem is that we don't any safe way to express a reference to valid fd right now, but we do have in 1.63.0 rust-lang/rust#95118

@pkolaczk
Copy link
Author

Is there any workaround for this, without waiting for 1.63?

@NobodyXu
Copy link
Contributor

For now, you can convert tokio::net::TcpStream to std::net::TcpStream, then use IntoRawFd to convert it to a raw fd.
Then using AsyncFd::new on it should be safe, but you can only register once, so I would recommend you to use Arc<AsyncFd>

@pkolaczk
Copy link
Author

That sounds like a plan, thank you. Will try that and get back to you.

@pkolaczk
Copy link
Author

pkolaczk commented Jun 20, 2022

Arc<AsyncFd> cannot be used in splice_to, because it is immutable. I guess I need to wrap it in a Mutex and try_lock.

@NobodyXu
Copy link
Contributor

Yeah.

The API for splice and tee is indeed a little bit hard to use, given that most of the time you are dealing with TcpStream, but I haven't found an alternative yet.

@pkolaczk
Copy link
Author

Aaaargh, cannot use Mutex, because the API takes an unowned reference, so the reference lives after .await. And obviously I can't reference a MutexGuard across the await points.

        let mut src = src.try_lock().unwrap();
        let mut count = pipe_write.splice_from(&mut src, None, 4096);
        let count = count.await?;  
21  |         let mut src = src.try_lock().unwrap();
    |             ------- has type `MutexGuard<'_, AsyncFd<i32>>` which is not `Send`
22  |         let mut count = pipe_write.splice_from(&mut src, None, 4096);
23  |         let mut count = count.await?;
    |                              ^^^^^^ await occurs here, with `mut src` maybe used later
...
32  |     }
    |     - `mut src` is later dropped here

But trying to drop the guard earlier also doesn't work:

        let mut src = src.try_lock().unwrap();
        let mut count = pipe_write.splice_from(&mut src, None, 4096);
        drop(src);   // move out of `src` occurs here
        let count = count.await?;

@NobodyXu
Copy link
Contributor

You need to use tokio::sync::Mutex.

@pkolaczk
Copy link
Author

Ah, good catch. Ok, mutex problem solved. Compiles but still doesn't work.

pub fn split(stream: TcpStream) -> (In, Out) {
  let fd = stream.into_std().unwrap().as_raw_fd();
  let async_fd = Arc::new(Mutex::new(AsyncFd::new(fd).unwrap()));
  ...

I'm getting a panic here: "Bad file descriptor"

@NobodyXu
Copy link
Contributor

NobodyXu commented Jun 20, 2022

let fd = stream.into_std().unwrap().as_raw_fd();

You need to use IntoRawFd::into_raw_fd here.

AsRawFd::as_raw_fd takes a &self and return a RawFd that is owned by std::net::TcpStream.
IntoRawFd::into_raw_fd takes a self and returns a RawFd that gets its ownership from std::net::TcpStream.

Though I strongly recommend you to do this instead:

use io_lifetimes::OwnedFd; // Replacement for OwnedFd before 1.63.0 comes out

pub fn split(stream: TcpStream) -> (In, Out) {
  let fd: OwnedFd = stream.into_std().unwrap().into();
  let async_fd = Arc::new(Mutex::new(AsyncFd::new(fd).unwrap()));

OwnedFd will release the fd after it is dropped.

It also has a impl From std::net::TcpStream for OwnedFd implementation that you can use.

@pkolaczk
Copy link
Author

pkolaczk commented Jun 20, 2022

Ok, thanks that fixed it, but apparently this idea with locking is still wrong.

By using a lock on a shared descriptor, I cannot simultaneously write and read, and this leads to a deadlock.
I establish the connection and once I try to read from the stream it would logically block sending, and if I don't send anything, I also will never read anything from remote.

Need to find a better way without locking. And locking doesn't seem right here, as this AsyncFd is Sync and Send so why would I need locking?

Crazy idea: what if I just unsafely transmute the reference behind Arc to &mut?

@NobodyXu
Copy link
Contributor

@pkolaczk You don't have to use these unsafe stuffs.

You can use OwnedFd::try_clone to clone the fd to get two OwnedFd.

Then you won't need the Arc or Mutex.

@pkolaczk
Copy link
Author

Aaaah, and now you tell me this :D

@NobodyXu
Copy link
Contributor

NobodyXu commented Jun 20, 2022

Sorry I didn't realize it can be done this way earlier.

@pkolaczk
Copy link
Author

Yeah, but aren't we running again into the problem of registering the same Fd twice?
I need two AsyncFd<OwnedFd>

@NobodyXu
Copy link
Contributor

NobodyXu commented Jun 20, 2022

OwnedFd::try_clone duplicates the file handle at the OS level, so the kernel sees them as separate file descriptor/handle and you can register both twice.

@pkolaczk
Copy link
Author

Indeed the registration worked. But it still deadlocks, so I guess there must be still sth else I'm doing wrong. Debugging...

@pkolaczk
Copy link
Author

It is not a deadlock, but a livelock. It kinda works, but only one-way. I basically try to create a bidirectional proxy between 2 sockets.

Here is the code:

type In = AsyncFd<OwnedFd>;
type Out = AsyncFd<OwnedFd>;

pub fn split(stream: TcpStream) -> (In, Out) {
    let fd: OwnedFd = stream.into_std().unwrap().into();
    let in_fd = AsyncFd::new(fd.try_clone().unwrap()).unwrap();
    let out_fd = AsyncFd::new(fd).unwrap();
    (in_fd, out_fd)
}

pub async fn forward(mut src: In, mut dest: Out) -> io::Result<u64> {
    let (mut pipe_read, mut pipe_write) = tokio_pipe::pipe()?;
    let mut total_count = 0;
    loop {
        let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
        if count == 0 {
            break;
        }
        total_count += count as u64;
        while count > 0 {
            let sent_count = pipe_read.splice_to(&mut dest, None, count, true).await?;
            count -= sent_count;
        }
    }
    Ok(total_count)
}

pub async fn handle_connection(...) {

 // ... 

    let (iread, iwrite) = split(client);
    let (fread, fwrite) = split(forwardee);
  
    tokio::select! {
        result = forward(iread, fwrite) => { if let Err(e) = result {
            eprintln!("Transfer failed: {}", e);
        }},
        result = forward(fread, iwrite) => { if let Err(e) = result {
            eprintln!("Transfer failed: {}", e);
        }},
    };

}

Testing it with netcat, I'm able to get text from the client to the forwardee. But there are 2 problems:

  • responses are not coming back
  • after receiving the first pack of data, the program starts spinning on a CPU core (top says 100%), perf says it is inside tokio runtime as if it was busy-waiting for more data.

How to further debug that?

@pkolaczk
Copy link
Author

It is enough to just call splice_from to fall into that busy loop.

pub async fn forward(mut src: In, mut dest: Out) -> io::Result<u64> {
    let (mut pipe_read, mut pipe_write) = tokio_pipe::pipe()?;
    let mut total_count = 0;
    loop {
        eprintln!("Waiting for more data...");
        let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
        eprintln!("Received {} bytes", count);
        if count == 0 {
            break;
        }
        total_count += count as u64;
    }
    Ok(total_count)
}

Logs:

Incoming connection from 127.0.0.1:59216
Waiting for more data...                                               // connected with netcat, truly waiting, CPU idle
Received 4 bytes                                                         // written 'foo' to the connection
Waiting for more data...                                               // CPU spinning, but not in my code 

@pkolaczk
Copy link
Author

src.readable seems to block only the first time:

...
        eprintln!("Waiting for readability...");
        src.readable().await?;
        eprintln!("Reading data...");
        let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
        eprintln!("Received {} bytes", count);
...

results in:

Incoming connection from 127.0.0.1:59220
Waiting for readability...       // initially waits here until I send some data to the socket
Reading data...                
Received 4 bytes
Waiting for readability...       // should stop here, but doesnt  
Reading data...                    // no more data in the socket but it somehow got past the "waiting for readability" await, stops in splice

@pkolaczk
Copy link
Author

pkolaczk commented Jun 20, 2022

https://zephyr.moe/2021/07/23/rust-zero-copy/

They do this hack:

 // read until the socket buffer is empty
 // or the pipe is filled
 // clear readiness (EPOLLIN)
 r.read(&mut [0u8; 0]).await?;

Can it be related?

@pkolaczk
Copy link
Author

This is obiously incorrect in general, but clearing readiness flag immediately after reading manually helps:

        eprintln!("Reading data...");
        let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
        eprintln!("Received {} bytes", count);
        let mut ready = src.readable().await?;
        ready.clear_ready();

@NobodyXu
Copy link
Contributor

I see where the errors come from, will update the PR.

@NobodyXu
Copy link
Contributor

@pkolaczk I updated the PR, can you remove the workaround and try again please?

@NobodyXu
Copy link
Contributor

Also, I recommend you to replace the following lines:

    tokio::select! {
        result = forward(iread, fwrite) => { if let Err(e) = result {
            eprintln!("Transfer failed: {}", e);
        }},
        result = forward(fread, iwrite) => { if let Err(e) = result {
            eprintln!("Transfer failed: {}", e);
        }},
    };

with:

let read_task = tokio::spawn(async move {
    loop {
        if let Err(e) = forward(iread, fwrite).await {
            eprintln!("Transfer failed: {}", e);
            break Err::<(), _>(e);
        }
    }
});

let write_task = tokio::spawn(async move {
    loop {
        if let Err(e) = forward(fread, iwrite).await {
            eprintln!("Transfer failed: {}", e);
            break Err::<(), _>(e);
        }
    }
});

This is much easier to reason about than using tokio::select! and provides better performance since the two futures can be executed concurrently and in parallel since the futures can be executed on two threads.

Other alternative would be tokio::try_join!, it would provide concurrency but not parallelism (all futures are executed on the same thread).

@pkolaczk
Copy link
Author

Thanks for the fixes, I'll test them soon.

Select also allows to run both futures concurrently and automatically closes one half when the other half gets closed. A single thread is enough here, with no zero copy I was able to get over 700 MB/s, and netcat/dd was a bottleneck.

@NobodyXu
Copy link
Contributor

Select also allows to run both futures concurrently and automatically closes one half when the other half gets closed. A single thread is enough here, with no zero copy I was able to get over 700 MB/s, and netcat/dd was a bottleneck.

That's good to hear.

Note that tokio::select! drops the other future once one of them is ready, that's why I said it is hard to reason about tokio::select! and is much easier to understand the code when using tokio::try_join! or tokio::spawn.

@pkolaczk
Copy link
Author

pkolaczk commented Jun 21, 2022

I tried the fix and it still doesn't work. It just swapped one problem for another one.

Spinning does not happen anymore.
However there seems to be a problem on the write side.

Incoming connection from 127.0.0.1:59230
Reading data...
// now writing 'write 1' to the incoming TCP stream
Received 8 bytes              
Writing 8 bytes
Writes done
// netcat printed 'write 1', so the receiving end got the data
Reading data...
// now writing 'write 2' to the stream
Received 8 bytes
Writing 8 bytes
// blocked here, the receiver didn't get those 8 bytes

Code:

    loop {
        eprintln!("Reading data...");
        let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
        eprintln!("Received {} bytes", count);
        if count == 0 {
            break;
        }
        total_count += count as u64;
        while count > 0 {
            eprintln!("Writing {} bytes", count);
            let sent_count = pipe_read.splice_to(&mut dest, None, count, true).await?;
            count -= sent_count;
        }
        eprintln!("Writes done");
    }

Also, lookng at your patch, it looks like you're clearing the flags unconditionally.
According to the tokio docs, it is incorrect.
You should be clearing readability / writability status only if you observe E_WOULDBLOCK from the underlyin I/O call.
In this case I guess, you just cleared the writability status of the output stream, despite the fact that the underlying socket is still ready to accept the data after the previous write. So tokio waits for waking up, but that never comes.

You need to check 'would block' status on both read and write side and clear the flags accordingly.

pub fn clear_ready(&mut self)

Indicates to tokio that the file descriptor is no longer ready. The internal readiness flag will be cleared, and tokio will wait for the next edge-triggered readiness notification from the OS.

It is critical that this function not be called unless your code actually observes that the file descriptor is not ready. Do not call it simply because, for example, a read succeeded; it should be called when a read is observed to block.

@pkolaczk
Copy link
Author

pkolaczk commented Jun 21, 2022

Also, one more suggestion (unrelated to splicing) - maybe you could make the API accept the OwnedReadHald / OwnedWriteHalf directly by using the idea from that zero copy blog post I linked earlier:

pub async fn zero_copy<X, Y, R, W>(mut r: R, mut w: W) -> io::Result<()>
where
    X: AsRawFd,
    Y: AsRawFd,
    R: AsyncRead + AsRef<X> + Unpin,
    W: AsyncWrite + AsRef<Y> + Unpin,

This is because OwnedReadHalf is AsRef<TcpStream> and TcpStream is AsRawFd.
My only concern (I haven't checked that) is whether AsyncRead/AsyncWrite are a sufficient repalcement for AsyncFd in your case.

@NobodyXu
Copy link
Contributor

That's a good idea.

I can simply use:

reader.read(&mut []).await

and

writer.write(&mut []).await

@NobodyXu
Copy link
Contributor

@pkolaczk I am trying to implement splice2 which support any AsyncRead/AsyncWrite, then I realize that there is no way to clear readiness of reader/writer, so not sure how to implement it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants