Skip to content

Commit

Permalink
Fix test errors and make futures !Unpin
Browse files Browse the repository at this point in the history
This is a breaking change. However, it comes with the ability to avoid
heap allocations in many cases, which is a significant boon for users
for async-channel.

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Sep 16, 2023
1 parent 51ab127 commit 6d99af0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 102 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ concurrent-queue = "2"
event-listener = "2.4.0"
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" }
futures-core = "0.3.5"
pin-project-lite = "0.2.11"

[dev-dependencies]
easy-parallel = "3"
Expand Down
185 changes: 88 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::usize;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
use futures_core::stream::Stream;

struct Channel<T> {
Expand Down Expand Up @@ -129,8 +130,8 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
channel,
listener: None,
};
(s, r)
}
Expand Down Expand Up @@ -169,8 +170,8 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
channel,
listener: None,
};
(s, r)
}
Expand Down Expand Up @@ -243,8 +244,8 @@ impl<T> Sender<T> {
pub fn send(&self, msg: T) -> Send<'_, T> {
Send::_new(SendInner {
sender: self,
listener: None,
msg: Some(msg),
listener: EventListener::new(&self.channel.send_ops),
})
}

Expand Down Expand Up @@ -473,24 +474,34 @@ impl<T> Clone for Sender<T> {
}
}

/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,

/// Listens for a send or close event to unblock this stream.
pin_project_lite::pin_project! {
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// TODO: This is pinned and boxed because `Receiver<T>` is `Unpin` and the newest version
/// of `event_listener::EventListener` is not. At the next major release, we can remove the
/// `Pin<Box<>>` and make `Receiver<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,

// Listens for a send or close event to unblock this stream.
#[pin]
listener: EventListener,
}

impl<T> PinnedDrop for Receiver<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();

// Decrement the receiver count and close the channel if it drops down to zero.
if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
this.channel.close();
}
}
}
}

impl<T> Receiver<T> {
Expand Down Expand Up @@ -553,7 +564,7 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: None,
listener: EventListener::new(&self.channel.recv_ops),
})
}

Expand Down Expand Up @@ -755,15 +766,6 @@ impl<T> Receiver<T> {
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and close the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
Expand All @@ -781,7 +783,7 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
listener: None,
listener: EventListener::new(&self.channel.stream_ops),
}
}
}
Expand All @@ -792,37 +794,42 @@ impl<T> Stream for Receiver<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
futures_core::ready!(Pin::new(listener).poll(cx));
self.listener = None;
{
let this = self.as_mut().project();
if this.listener.is_listening() {
ready!(this.listener.poll(cx));
}
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Create a listener and try sending the message again.
self.listener = Some(self.channel.stream_ops.listen());
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
let mut this = self.as_mut().project();
if this.listener.is_listening() {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen();
}
}
}
Expand Down Expand Up @@ -907,7 +914,7 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: None,
listener: EventListener::new(&self.channel.stream_ops),
}),
}
}
Expand Down Expand Up @@ -1072,50 +1079,42 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
/// TODO: This is pinned and boxed because `Send<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Send<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
msg: Option<T>,
pin_project_lite::pin_project! {
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
msg: Option<T>,
#[pin]
listener: EventListener,
}
}

impl<'a, T> Unpin for SendInner<'a, T> {}

impl<'a, T> EventListenerFuture for SendInner<'a, T> {
type Output = Result<(), SendError<T>>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let mut this = self.project();

loop {
let msg = self.msg.take().unwrap();
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match self.sender.try_send(msg) {
match this.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => self.msg = Some(m),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try sending again.
self.listener = Some(self.sender.channel.send_ops.listen());
}
Some(l) => {
// Poll using the given strategy
if let Poll::Pending = S::poll(strategy, l.as_mut(), context) {
return Poll::Pending;
} else {
self.listener = None;
}
}
if this.listener.is_listening() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
} else {
this.listener.as_mut().listen();
}
}
}
Expand All @@ -1129,48 +1128,40 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
/// TODO: This is pinned and boxed because `Recv<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Recv<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
pin_project_lite::pin_project! {
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
#[pin]
listener: EventListener,
}
}

impl<'a, T> Unpin for RecvInner<'a, T> {}

impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
self: Pin<&'x mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let mut this = self.project();

loop {
// Attempt to receive a message.
match self.receiver.try_recv() {
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = Some(self.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Poll using the given strategy.
if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) {
return Poll::Pending;
} else {
self.listener = None;
}
}
if this.listener.is_listening() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
} else {
this.listener.as_mut().listen();
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,10 @@ fn forget_blocked_sender() {
.add(move || {
assert!(future::block_on(s1.send(3)).is_ok());
assert!(future::block_on(s1.send(7)).is_ok());
let mut s1_fut = s1.send(13);
let s1_fut = s1.send(13);
futures_lite::pin!(s1_fut);
// Poll but keep the future alive.
assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None);
assert_eq!(future::block_on(future::poll_once(s1_fut)), None);
sleep(ms(500));
})
.add(move || {
Expand All @@ -358,8 +359,9 @@ fn forget_blocked_receiver() {

Parallel::new()
.add(move || {
let mut r1_fut = r1.recv();
let r1_fut = r1.recv();
// Poll but keep the future alive.
futures_lite::pin!(r1_fut);
assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None);
sleep(ms(500));
})
Expand Down Expand Up @@ -436,8 +438,9 @@ fn mpmc_stream() {

Parallel::new()
.each(0..THREADS, {
let mut r = r;
let r = r;
move |_| {
futures_lite::pin!(r);
for _ in 0..COUNT {
let n = future::block_on(r.next()).unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
Expand Down
3 changes: 2 additions & 1 deletion tests/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ fn mpmc_stream() {

Parallel::new()
.each(0..THREADS, {
let mut r = r.clone();
let r = r.clone();
move |_| {
futures_lite::pin!(r);
for _ in 0..COUNT {
let n = future::block_on(r.next()).unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
Expand Down

0 comments on commit 6d99af0

Please sign in to comment.