Skip to content

Commit

Permalink
refactor(iroh-net): replace flume in iroh-net with async_channel (#2539)
Browse files Browse the repository at this point in the history
## Description

refactor(iroh-net): replace flume in iroh-net with async_channel

Rationale: see #2536

This is the first in a series of PRs that will replace flume with
async_channel. In this case it is very close to a drop in replacement
without any need for changes.

Only noteable changes:

send_async becomes send
send becomes send_blocking
Receiver implements Stream, so no need for .into_stream()
Receiver is not Unpin, so to make it unpin you need to do
`Box::pin(recv)` or use `.boxed()`

## Breaking Changes

None

## Notes & open questions

- Anything specific from flume we were relying on here?

## Change checklist

- [x] Self-review.
- [x] ~~Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.~~
- [x] ~~Tests if relevant.~~
- [x] ~~All breaking changes documented.~~
  • Loading branch information
rklaehn authored Jul 24, 2024
1 parent 9052905 commit 22314a1
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
async-channel = "2.3.1"
base64 = "0.22.1"
backoff = "0.4.0"
bytes = "1"
netdev = "0.30.0"
der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-concurrency = "7.6.0"
futures-lite = "2.3"
Expand Down
22 changes: 11 additions & 11 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use derive_more::FromStr;
use futures_lite::{stream::Boxed as BoxStream, StreamExt};
use tracing::{debug, error, trace, warn};

use flume::Sender;
use async_channel::Sender;
use iroh_base::key::PublicKey;
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl LocalSwarmDiscovery {
/// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime.
pub fn new(node_id: NodeId) -> Result<Self> {
debug!("Creating new LocalSwarmDiscovery service");
let (send, recv) = flume::bounded(64);
let (send, recv) = async_channel::bounded(64);
let task_sender = send.clone();
let rt = tokio::runtime::Handle::current();
let mut guard = Some(LocalSwarmDiscovery::spawn_discoverer(
Expand All @@ -80,7 +80,7 @@ impl LocalSwarmDiscovery {
let mut timeouts = JoinSet::new();
loop {
trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick");
let msg = match recv.recv_async().await {
let msg = match recv.recv().await {
Err(err) => {
error!("LocalSwarmDiscovery service error: {err:?}");
error!("closing LocalSwarmDiscovery");
Expand Down Expand Up @@ -124,7 +124,7 @@ impl LocalSwarmDiscovery {
for sender in senders.values() {
let item: DiscoveryItem = (&peer_info).into();
trace!(?item, "sending DiscoveryItem");
sender.send_async(Ok(item)).await.ok();
sender.send(Ok(item)).await.ok();
}
}
trace!(
Expand All @@ -141,7 +141,7 @@ impl LocalSwarmDiscovery {
if let Some(peer_info) = node_addrs.get(&node_id) {
let item: DiscoveryItem = peer_info.into();
debug!(?item, "sending DiscoveryItem");
sender.send_async(Ok(item)).await.ok();
sender.send(Ok(item)).await.ok();
}
if let Some(senders_for_node_id) = senders.get_mut(&node_id) {
senders_for_node_id.insert(id, sender);
Expand All @@ -155,7 +155,7 @@ impl LocalSwarmDiscovery {
tokio::time::sleep(DISCOVERY_DURATION).await;
trace!(?node_id, "discovery timeout");
timeout_sender
.send_async(Message::Timeout(node_id, id))
.send(Message::Timeout(node_id, id))
.await
.ok();
});
Expand Down Expand Up @@ -210,7 +210,7 @@ impl LocalSwarmDiscovery {
);

sender
.send(Message::Discovery(node_id.to_string(), peer.clone()))
.send_blocking(Message::Discovery(node_id.to_string(), peer.clone()))
.ok();
};
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
Expand Down Expand Up @@ -267,23 +267,23 @@ impl From<&Peer> for DiscoveryItem {

impl Discovery for LocalSwarmDiscovery {
fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option<BoxStream<Result<DiscoveryItem>>> {
let (send, recv) = flume::bounded(20);
let (send, recv) = async_channel::bounded(20);
let discovery_sender = self.sender.clone();
tokio::spawn(async move {
discovery_sender
.send_async(Message::SendAddrs(node_id, send))
.send(Message::SendAddrs(node_id, send))
.await
.ok();
});
Some(recv.into_stream().boxed())
Some(recv.boxed())
}

fn publish(&self, info: &AddrInfo) {
let discovery_sender = self.sender.clone();
let info = info.clone();
tokio::spawn(async move {
discovery_sender
.send_async(Message::ChangeLocalAddrs(info))
.send(Message::ChangeLocalAddrs(info))
.await
.ok();
});
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub(crate) struct MagicSock {
proxy_url: Option<Url>,

/// Used for receiving relay messages.
relay_recv_receiver: flume::Receiver<RelayRecvResult>,
relay_recv_receiver: async_channel::Receiver<RelayRecvResult>,
/// Stores wakers, to be called when relay_recv_ch receives new data.
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
network_send_wakers: parking_lot::Mutex<Option<Waker>>,
Expand Down Expand Up @@ -786,11 +786,11 @@ impl MagicSock {
break;
}
match self.relay_recv_receiver.try_recv() {
Err(flume::TryRecvError::Empty) => {
Err(async_channel::TryRecvError::Empty) => {
self.network_recv_wakers.lock().replace(cx.waker().clone());
break;
}
Err(flume::TryRecvError::Disconnected) => {
Err(async_channel::TryRecvError::Closed) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
Expand Down Expand Up @@ -1375,7 +1375,7 @@ impl Handle {
insecure_skip_relay_cert_verify,
} = opts;

let (relay_recv_sender, relay_recv_receiver) = flume::bounded(128);
let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128);

let (pconn4, pconn6) = bind(port)?;
let port = pconn4.port();
Expand Down Expand Up @@ -1701,7 +1701,7 @@ struct Actor {
relay_actor_sender: mpsc::Sender<RelayActorMessage>,
relay_actor_cancel_token: CancellationToken,
/// Channel to send received relay messages on, for processing.
relay_recv_sender: flume::Sender<RelayRecvResult>,
relay_recv_sender: async_channel::Sender<RelayRecvResult>,
/// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun.
periodic_re_stun_timer: time::Interval,
/// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc.
Expand Down Expand Up @@ -1855,7 +1855,7 @@ impl Actor {
let passthroughs = self.process_relay_read_result(read_result);
for passthrough in passthroughs {
self.relay_recv_sender
.send_async(passthrough)
.send(passthrough)
.await
.expect("missing recv sender");
let mut wakers = self.msock.network_recv_wakers.lock();
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/magicsock/udp_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ mod tests {
let (m2, _m2_key) = wrap_socket(m2)?;

let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port());
let (m1_send, m1_recv) = flume::bounded(8);
let (m1_send, m1_recv) = async_channel::bounded(8);

let m1_task = tokio::task::spawn(async move {
if let Some(conn) = m1.accept().await {
let conn = conn.await?;
let (mut send_bi, mut recv_bi) = conn.accept_bi().await?;

let val = recv_bi.read_to_end(usize::MAX).await?;
m1_send.send_async(val).await?;
m1_send.send(val).await?;
send_bi.finish().await?;
}

Expand All @@ -220,7 +220,7 @@ mod tests {
drop(send_bi);

// make sure the right values arrived
let val = m1_recv.recv_async().await?;
let val = m1_recv.recv().await?;
assert_eq!(val, b"hello");

m1_task.await??;
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/net/netmon/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(super) struct Actor {
/// OS specific monitor.
#[allow(dead_code)]
route_monitor: RouteMonitor,
mon_receiver: flume::Receiver<NetworkMessage>,
mon_receiver: async_channel::Receiver<NetworkMessage>,
actor_receiver: mpsc::Receiver<ActorMessage>,
actor_sender: mpsc::Sender<ActorMessage>,
/// Callback registry.
Expand All @@ -84,7 +84,7 @@ impl Actor {
let wall_time = Instant::now();

// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
let (mon_sender, mon_receiver) = flume::bounded(MON_CHAN_CAPACITY);
let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY);
let route_monitor = RouteMonitor::new(mon_sender)?;
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);

Expand Down Expand Up @@ -129,7 +129,7 @@ impl Actor {
debounce_interval.reset_immediately();
}
}
Ok(_event) = self.mon_receiver.recv_async() => {
Ok(_event) = self.mon_receiver.recv() => {
trace!("network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/net/netmon/android.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::actor::NetworkMessage;
pub(super) struct RouteMonitor {}

impl RouteMonitor {
pub(super) fn new(_sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(_sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
// Very sad monitor. Android doesn't allow us to do this

Ok(RouteMonitor {})
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/src/net/netmon/bsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Drop for RouteMonitor {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
socket.set_nonblocking(true)?;
let socket_std: std::os::unix::net::UnixStream = socket.into();
Expand All @@ -44,7 +44,7 @@ impl RouteMonitor {
) {
Ok(msgs) => {
if contains_interesting_message(&msgs) {
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
}
Err(err) => {
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/net/netmon/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ macro_rules! get_nla {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
let (mut conn, mut _handle, mut messages) = new_connection()?;

// Specify flags to listen on.
Expand Down Expand Up @@ -87,7 +87,7 @@ impl RouteMonitor {
continue;
} else {
addrs.insert(addr.clone());
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
}
}
Expand All @@ -97,7 +97,7 @@ impl RouteMonitor {
if let Some(addr) = get_nla!(msg, address::Nla::Address) {
addrs.remove(addr);
}
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewRoute(msg) | RtnlMessage::DelRoute(msg) => {
trace!("ROUTE:: {:?}", msg);
Expand All @@ -124,15 +124,15 @@ impl RouteMonitor {
}
}
}
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewRule(msg) => {
trace!("NEWRULE: {:?}", msg);
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::DelRule(msg) => {
trace!("DELRULE: {:?}", msg);
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewLink(msg) => {
trace!("NEWLINK: {:?}", msg);
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/net/netmon/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ pub(super) struct RouteMonitor {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
// Register two callbacks with the windows api
let mut cb_handler = CallbackHandler::default();

// 1. Unicast Address Changes
let s = sender.clone();
cb_handler.register_unicast_address_change_callback(Box::new(move || {
if let Err(err) = s.send(NetworkMessage::Change) {
if let Err(err) = s.send_blocking(NetworkMessage::Change) {
warn!("unable to send: unicast change notification: {:?}", err);
}
}))?;

// 2. Route Changes
cb_handler.register_route_change_callback(Box::new(move || {
if let Err(err) = sender.send(NetworkMessage::Change) {
if let Err(err) = sender.send_blocking(NetworkMessage::Change) {
warn!("unable to send: route change notification: {:?}", err);
}
}))?;
Expand Down

0 comments on commit 22314a1

Please sign in to comment.