Skip to content

Commit

Permalink
Auto merge of #73761 - rijenkii:master, r=KodrAus
Browse files Browse the repository at this point in the history
Add `peek` and `peek_from` to `UnixStream` and `UnixDatagram`

This is my first PR, so I'm sure I've done some things wrong.

This PR:
  * adds `peek` function to `UnixStream`;
  * adds `peek` and `peek_from` to `UnixDatagram`;
  * moves `UnixDatagram::recv_from` implementation to a private function `recv_from_flags`, as `peek_from` uses the same code, just with different flags.

I've taken the documentation from `TcpStream` and `UdpStream`, so it may or may not make sense (I'm bad with english words).
Also, I'm not sure what I should write in the `unstable` attribute, so I've made up the name and set the issue to "none".

Closes #68565.
  • Loading branch information
bors committed Sep 11, 2020
2 parents bc57bd8 + 64b8fd7 commit 9911160
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 20 deletions.
132 changes: 112 additions & 20 deletions library/std/src/sys/unix/ext/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,32 @@ impl UnixStream {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.shutdown(how)
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixStream;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixStream::connect("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let len = socket.peek(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}
}

#[stable(feature = "unix_socket", since = "1.10.0")]
Expand Down Expand Up @@ -1291,6 +1317,33 @@ impl UnixDatagram {
SocketAddr::new(|addr, len| unsafe { libc::getpeername(*self.0.as_inner(), addr, len) })
}

fn recv_from_flags(
&self,
buf: &mut [u8],
flags: libc::c_int,
) -> io::Result<(usize, SocketAddr)> {
let mut count = 0;
let addr = SocketAddr::new(|addr, len| unsafe {
count = libc::recvfrom(
*self.0.as_inner(),
buf.as_mut_ptr() as *mut _,
buf.len(),
flags,
addr,
len,
);
if count > 0 {
1
} else if count == 0 {
0
} else {
-1
}
})?;

Ok((count as usize, addr))
}

/// Receives data from the socket.
///
/// On success, returns the number of bytes read and the address from
Expand All @@ -1311,26 +1364,7 @@ impl UnixDatagram {
/// ```
#[stable(feature = "unix_socket", since = "1.10.0")]
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let mut count = 0;
let addr = SocketAddr::new(|addr, len| unsafe {
count = libc::recvfrom(
*self.0.as_inner(),
buf.as_mut_ptr() as *mut _,
buf.len(),
0,
addr,
len,
);
if count > 0 {
1
} else if count == 0 {
0
} else {
-1
}
})?;

Ok((count as usize, addr))
self.recv_from_flags(buf, 0)
}

/// Receives data from the socket.
Expand Down Expand Up @@ -1601,6 +1635,64 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.shutdown(how)
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixDatagram;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixDatagram::bind("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let len = socket.peek(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}

/// Receives a single datagram message on the socket, without removing it from the
/// queue. On success, returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array `buf` of sufficient size to
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
/// excess bytes may be discarded.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying `recvfrom` system call.
///
/// Do not use this function to implement busy waiting, instead use `libc::poll` to
/// synchronize IO events on one or more sockets.
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_peek)]
///
/// use std::os::unix::net::UnixDatagram;
///
/// fn main() -> std::io::Result<()> {
/// let socket = UnixDatagram::bind("/tmp/sock")?;
/// let mut buf = [0; 10];
/// let (len, addr) = socket.peek_from(&mut buf).expect("peek failed");
/// Ok(())
/// }
/// ```
#[unstable(feature = "unix_socket_peek", issue = "none")]
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.recv_from_flags(buf, libc::MSG_PEEK)
}
}

#[stable(feature = "unix_socket", since = "1.10.0")]
Expand Down
80 changes: 80 additions & 0 deletions library/std/src/sys/unix/ext/net/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,83 @@ fn test_unix_datagram_timeout_zero_duration() {
fn abstract_namespace_not_allowed() {
assert!(UnixStream::connect("\0asdf").is_err());
}

#[test]
fn test_unix_stream_peek() {
let (txdone, rxdone) = crate::sync::mpsc::channel();

let dir = tmpdir();
let path = dir.path().join("sock");

let listener = or_panic!(UnixListener::bind(&path));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(&[1, 3, 3, 7]));
or_panic!(rxdone.recv());
});

let mut stream = or_panic!(UnixStream::connect(&path));
let mut buf = [0; 10];
for _ in 0..2 {
assert_eq!(or_panic!(stream.peek(&mut buf)), 4);
}
assert_eq!(or_panic!(stream.read(&mut buf)), 4);

or_panic!(stream.set_nonblocking(true));
match stream.peek(&mut buf) {
Ok(_) => panic!("expected error"),
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => panic!("unexpected error: {}", e),
}

or_panic!(txdone.send(()));
thread.join().unwrap();
}

#[test]
fn test_unix_datagram_peek() {
let dir = tmpdir();
let path1 = dir.path().join("sock");

let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
or_panic!(sock2.connect(&path1));

let msg = b"hello world";
or_panic!(sock2.send(msg));
for _ in 0..2 {
let mut buf = [0; 11];
let size = or_panic!(sock1.peek(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}

let mut buf = [0; 11];
let size = or_panic!(sock1.recv(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}

#[test]
fn test_unix_datagram_peek_from() {
let dir = tmpdir();
let path1 = dir.path().join("sock");

let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
or_panic!(sock2.connect(&path1));

let msg = b"hello world";
or_panic!(sock2.send(msg));
for _ in 0..2 {
let mut buf = [0; 11];
let (size, _) = or_panic!(sock1.peek_from(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}

let mut buf = [0; 11];
let size = or_panic!(sock1.recv(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}

0 comments on commit 9911160

Please sign in to comment.