From 6f60bd8a0e171967c05300e13da27845d010ef51 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 24 Jul 2024 12:44:35 +0300 Subject: [PATCH] replace flume in iroh-net with async_channel Rationale: see https://github.com/n0-computer/iroh/pull/2536 --- Cargo.lock | 2 +- iroh-net/Cargo.toml | 2 +- .../src/discovery/local_swarm_discovery.rs | 22 +++++++++---------- iroh-net/src/magicsock.rs | 12 +++++----- iroh-net/src/magicsock/udp_conn.rs | 6 ++--- iroh-net/src/net/netmon/actor.rs | 6 ++--- iroh-net/src/net/netmon/android.rs | 2 +- iroh-net/src/net/netmon/bsd.rs | 4 ++-- iroh-net/src/net/netmon/linux.rs | 12 +++++----- iroh-net/src/net/netmon/windows.rs | 6 ++--- 10 files changed, 37 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 469c0501b5..54b81fff6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2836,6 +2836,7 @@ name = "iroh-net" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "axum", "backoff", "base64 0.22.1", @@ -2846,7 +2847,6 @@ dependencies = [ "der", "derive_more", "duct", - "flume", "futures-buffered", "futures-concurrency", "futures-lite 2.3.0", diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 87562cd8fb..8b677fbfb4 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -17,13 +17,13 @@ workspace = true [dependencies] anyhow = { version = "1" } +async-channel = "2.3.1" base64 = "0.22.1" backoff = "0.4.0" bytes = "1" netdev = "0.30.0" der = { version = "0.7", features = ["alloc", "derive"] } derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] } -flume = "0.11" futures-buffered = "0.2.4" futures-concurrency = "7.6.0" futures-lite = "2.3" diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index b63ad41567..b1e9bfffc7 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -14,7 +14,7 @@ use derive_more::FromStr; use futures_lite::{stream::Boxed as BoxStream, StreamExt}; use tracing::{debug, error, trace, warn}; -use flume::Sender; +use async_channel::Sender; use iroh_base::key::PublicKey; use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; use tokio::task::JoinSet; @@ -62,7 +62,7 @@ impl LocalSwarmDiscovery { /// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime. pub fn new(node_id: NodeId) -> Result { debug!("Creating new LocalSwarmDiscovery service"); - let (send, recv) = flume::bounded(64); + let (send, recv) = async_channel::bounded(64); let task_sender = send.clone(); let rt = tokio::runtime::Handle::current(); let mut guard = Some(LocalSwarmDiscovery::spawn_discoverer( @@ -80,7 +80,7 @@ impl LocalSwarmDiscovery { let mut timeouts = JoinSet::new(); loop { trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick"); - let msg = match recv.recv_async().await { + let msg = match recv.recv().await { Err(err) => { error!("LocalSwarmDiscovery service error: {err:?}"); error!("closing LocalSwarmDiscovery"); @@ -124,7 +124,7 @@ impl LocalSwarmDiscovery { for sender in senders.values() { let item: DiscoveryItem = (&peer_info).into(); trace!(?item, "sending DiscoveryItem"); - sender.send_async(Ok(item)).await.ok(); + sender.send(Ok(item)).await.ok(); } } trace!( @@ -141,7 +141,7 @@ impl LocalSwarmDiscovery { if let Some(peer_info) = node_addrs.get(&node_id) { let item: DiscoveryItem = peer_info.into(); debug!(?item, "sending DiscoveryItem"); - sender.send_async(Ok(item)).await.ok(); + sender.send(Ok(item)).await.ok(); } if let Some(senders_for_node_id) = senders.get_mut(&node_id) { senders_for_node_id.insert(id, sender); @@ -155,7 +155,7 @@ impl LocalSwarmDiscovery { tokio::time::sleep(DISCOVERY_DURATION).await; trace!(?node_id, "discovery timeout"); timeout_sender - .send_async(Message::Timeout(node_id, id)) + .send(Message::Timeout(node_id, id)) .await .ok(); }); @@ -210,7 +210,7 @@ impl LocalSwarmDiscovery { ); sender - .send(Message::Discovery(node_id.to_string(), peer.clone())) + .send_blocking(Message::Discovery(node_id.to_string(), peer.clone())) .ok(); }; let mut addrs: HashMap> = HashMap::default(); @@ -267,15 +267,15 @@ impl From<&Peer> for DiscoveryItem { impl Discovery for LocalSwarmDiscovery { fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option>> { - let (send, recv) = flume::bounded(20); + let (send, recv) = async_channel::bounded(20); let discovery_sender = self.sender.clone(); tokio::spawn(async move { discovery_sender - .send_async(Message::SendAddrs(node_id, send)) + .send(Message::SendAddrs(node_id, send)) .await .ok(); }); - Some(recv.into_stream().boxed()) + Some(recv.boxed()) } fn publish(&self, info: &AddrInfo) { @@ -283,7 +283,7 @@ impl Discovery for LocalSwarmDiscovery { let info = info.clone(); tokio::spawn(async move { discovery_sender - .send_async(Message::ChangeLocalAddrs(info)) + .send(Message::ChangeLocalAddrs(info)) .await .ok(); }); diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 724732c8ea..9db0c3fec7 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -177,7 +177,7 @@ pub(crate) struct MagicSock { proxy_url: Option, /// Used for receiving relay messages. - relay_recv_receiver: flume::Receiver, + relay_recv_receiver: async_channel::Receiver, /// Stores wakers, to be called when relay_recv_ch receives new data. network_recv_wakers: parking_lot::Mutex>, network_send_wakers: parking_lot::Mutex>, @@ -786,11 +786,11 @@ impl MagicSock { break; } match self.relay_recv_receiver.try_recv() { - Err(flume::TryRecvError::Empty) => { + Err(async_channel::TryRecvError::Empty) => { self.network_recv_wakers.lock().replace(cx.waker().clone()); break; } - Err(flume::TryRecvError::Disconnected) => { + Err(async_channel::TryRecvError::Closed) => { return Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "connection closed", @@ -1375,7 +1375,7 @@ impl Handle { insecure_skip_relay_cert_verify, } = opts; - let (relay_recv_sender, relay_recv_receiver) = flume::bounded(128); + let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128); let (pconn4, pconn6) = bind(port)?; let port = pconn4.port(); @@ -1701,7 +1701,7 @@ struct Actor { relay_actor_sender: mpsc::Sender, relay_actor_cancel_token: CancellationToken, /// Channel to send received relay messages on, for processing. - relay_recv_sender: flume::Sender, + relay_recv_sender: async_channel::Sender, /// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun. periodic_re_stun_timer: time::Interval, /// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc. @@ -1855,7 +1855,7 @@ impl Actor { let passthroughs = self.process_relay_read_result(read_result); for passthrough in passthroughs { self.relay_recv_sender - .send_async(passthrough) + .send(passthrough) .await .expect("missing recv sender"); let mut wakers = self.msock.network_recv_wakers.lock(); diff --git a/iroh-net/src/magicsock/udp_conn.rs b/iroh-net/src/magicsock/udp_conn.rs index f28ccef80d..f4d641db26 100644 --- a/iroh-net/src/magicsock/udp_conn.rs +++ b/iroh-net/src/magicsock/udp_conn.rs @@ -192,7 +192,7 @@ mod tests { let (m2, _m2_key) = wrap_socket(m2)?; let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port()); - let (m1_send, m1_recv) = flume::bounded(8); + let (m1_send, m1_recv) = async_channel::bounded(8); let m1_task = tokio::task::spawn(async move { if let Some(conn) = m1.accept().await { @@ -200,7 +200,7 @@ mod tests { let (mut send_bi, mut recv_bi) = conn.accept_bi().await?; let val = recv_bi.read_to_end(usize::MAX).await?; - m1_send.send_async(val).await?; + m1_send.send(val).await?; send_bi.finish().await?; } @@ -220,7 +220,7 @@ mod tests { drop(send_bi); // make sure the right values arrived - let val = m1_recv.recv_async().await?; + let val = m1_recv.recv().await?; assert_eq!(val, b"hello"); m1_task.await??; diff --git a/iroh-net/src/net/netmon/actor.rs b/iroh-net/src/net/netmon/actor.rs index 6838817d73..083e482caa 100644 --- a/iroh-net/src/net/netmon/actor.rs +++ b/iroh-net/src/net/netmon/actor.rs @@ -57,7 +57,7 @@ pub(super) struct Actor { /// OS specific monitor. #[allow(dead_code)] route_monitor: RouteMonitor, - mon_receiver: flume::Receiver, + mon_receiver: async_channel::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, /// Callback registry. @@ -84,7 +84,7 @@ impl Actor { let wall_time = Instant::now(); // Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries. - let (mon_sender, mon_receiver) = flume::bounded(MON_CHAN_CAPACITY); + let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY); let route_monitor = RouteMonitor::new(mon_sender)?; let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); @@ -129,7 +129,7 @@ impl Actor { debounce_interval.reset_immediately(); } } - Ok(_event) = self.mon_receiver.recv_async() => { + Ok(_event) = self.mon_receiver.recv() => { trace!("network activity detected"); last_event.replace(false); debounce_interval.reset_immediately(); diff --git a/iroh-net/src/net/netmon/android.rs b/iroh-net/src/net/netmon/android.rs index 2587258884..f92eb721f0 100644 --- a/iroh-net/src/net/netmon/android.rs +++ b/iroh-net/src/net/netmon/android.rs @@ -6,7 +6,7 @@ use super::actor::NetworkMessage; pub(super) struct RouteMonitor {} impl RouteMonitor { - pub(super) fn new(_sender: flume::Sender) -> Result { + pub(super) fn new(_sender: async_channel::Sender) -> Result { // Very sad monitor. Android doesn't allow us to do this Ok(RouteMonitor {}) diff --git a/iroh-net/src/net/netmon/bsd.rs b/iroh-net/src/net/netmon/bsd.rs index aa3bc7c47a..20daef64ba 100644 --- a/iroh-net/src/net/netmon/bsd.rs +++ b/iroh-net/src/net/netmon/bsd.rs @@ -23,7 +23,7 @@ impl Drop for RouteMonitor { } impl RouteMonitor { - pub(super) fn new(sender: flume::Sender) -> Result { + pub(super) fn new(sender: async_channel::Sender) -> Result { let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?; socket.set_nonblocking(true)?; let socket_std: std::os::unix::net::UnixStream = socket.into(); @@ -44,7 +44,7 @@ impl RouteMonitor { ) { Ok(msgs) => { if contains_interesting_message(&msgs) { - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } } Err(err) => { diff --git a/iroh-net/src/net/netmon/linux.rs b/iroh-net/src/net/netmon/linux.rs index f1b98dec11..12976b37e8 100644 --- a/iroh-net/src/net/netmon/linux.rs +++ b/iroh-net/src/net/netmon/linux.rs @@ -49,7 +49,7 @@ macro_rules! get_nla { } impl RouteMonitor { - pub(super) fn new(sender: flume::Sender) -> Result { + pub(super) fn new(sender: async_channel::Sender) -> Result { let (mut conn, mut _handle, mut messages) = new_connection()?; // Specify flags to listen on. @@ -87,7 +87,7 @@ impl RouteMonitor { continue; } else { addrs.insert(addr.clone()); - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } } } @@ -97,7 +97,7 @@ impl RouteMonitor { if let Some(addr) = get_nla!(msg, address::Nla::Address) { addrs.remove(addr); } - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } RtnlMessage::NewRoute(msg) | RtnlMessage::DelRoute(msg) => { trace!("ROUTE:: {:?}", msg); @@ -124,15 +124,15 @@ impl RouteMonitor { } } } - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } RtnlMessage::NewRule(msg) => { trace!("NEWRULE: {:?}", msg); - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } RtnlMessage::DelRule(msg) => { trace!("DELRULE: {:?}", msg); - sender.send_async(NetworkMessage::Change).await.ok(); + sender.send(NetworkMessage::Change).await.ok(); } RtnlMessage::NewLink(msg) => { trace!("NEWLINK: {:?}", msg); diff --git a/iroh-net/src/net/netmon/windows.rs b/iroh-net/src/net/netmon/windows.rs index 284f6c76c4..da77899d0f 100644 --- a/iroh-net/src/net/netmon/windows.rs +++ b/iroh-net/src/net/netmon/windows.rs @@ -19,21 +19,21 @@ pub(super) struct RouteMonitor { } impl RouteMonitor { - pub(super) fn new(sender: flume::Sender) -> Result { + pub(super) fn new(sender: async_channel::Sender) -> Result { // Register two callbacks with the windows api let mut cb_handler = CallbackHandler::default(); // 1. Unicast Address Changes let s = sender.clone(); cb_handler.register_unicast_address_change_callback(Box::new(move || { - if let Err(err) = s.send(NetworkMessage::Change) { + if let Err(err) = s.send_blocking(NetworkMessage::Change) { warn!("unable to send: unicast change notification: {:?}", err); } }))?; // 2. Route Changes cb_handler.register_route_change_callback(Box::new(move || { - if let Err(err) = sender.send(NetworkMessage::Change) { + if let Err(err) = sender.send_blocking(NetworkMessage::Change) { warn!("unable to send: route change notification: {:?}", err); } }))?;