Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-net): replace flume in iroh-net with async_channel #2539

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
async-channel = "2.3.1"
base64 = "0.22.1"
backoff = "0.4.0"
bytes = "1"
netdev = "0.30.0"
der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-concurrency = "7.6.0"
futures-lite = "2.3"
Expand Down
22 changes: 11 additions & 11 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use derive_more::FromStr;
use futures_lite::{stream::Boxed as BoxStream, StreamExt};
use tracing::{debug, error, trace, warn};

use flume::Sender;
use async_channel::Sender;
use iroh_base::key::PublicKey;
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl LocalSwarmDiscovery {
/// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime.
pub fn new(node_id: NodeId) -> Result<Self> {
debug!("Creating new LocalSwarmDiscovery service");
let (send, recv) = flume::bounded(64);
let (send, recv) = async_channel::bounded(64);
let task_sender = send.clone();
let rt = tokio::runtime::Handle::current();
let mut guard = Some(LocalSwarmDiscovery::spawn_discoverer(
Expand All @@ -80,7 +80,7 @@ impl LocalSwarmDiscovery {
let mut timeouts = JoinSet::new();
loop {
trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick");
let msg = match recv.recv_async().await {
let msg = match recv.recv().await {
Err(err) => {
error!("LocalSwarmDiscovery service error: {err:?}");
error!("closing LocalSwarmDiscovery");
Expand Down Expand Up @@ -124,7 +124,7 @@ impl LocalSwarmDiscovery {
for sender in senders.values() {
let item: DiscoveryItem = (&peer_info).into();
trace!(?item, "sending DiscoveryItem");
sender.send_async(Ok(item)).await.ok();
sender.send(Ok(item)).await.ok();
}
}
trace!(
Expand All @@ -141,7 +141,7 @@ impl LocalSwarmDiscovery {
if let Some(peer_info) = node_addrs.get(&node_id) {
let item: DiscoveryItem = peer_info.into();
debug!(?item, "sending DiscoveryItem");
sender.send_async(Ok(item)).await.ok();
sender.send(Ok(item)).await.ok();
}
if let Some(senders_for_node_id) = senders.get_mut(&node_id) {
senders_for_node_id.insert(id, sender);
Expand All @@ -155,7 +155,7 @@ impl LocalSwarmDiscovery {
tokio::time::sleep(DISCOVERY_DURATION).await;
trace!(?node_id, "discovery timeout");
timeout_sender
.send_async(Message::Timeout(node_id, id))
.send(Message::Timeout(node_id, id))
.await
.ok();
});
Expand Down Expand Up @@ -210,7 +210,7 @@ impl LocalSwarmDiscovery {
);

sender
.send(Message::Discovery(node_id.to_string(), peer.clone()))
.send_blocking(Message::Discovery(node_id.to_string(), peer.clone()))
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
.ok();
};
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
Expand Down Expand Up @@ -267,23 +267,23 @@ impl From<&Peer> for DiscoveryItem {

impl Discovery for LocalSwarmDiscovery {
fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option<BoxStream<Result<DiscoveryItem>>> {
let (send, recv) = flume::bounded(20);
let (send, recv) = async_channel::bounded(20);
let discovery_sender = self.sender.clone();
tokio::spawn(async move {
discovery_sender
.send_async(Message::SendAddrs(node_id, send))
.send(Message::SendAddrs(node_id, send))
.await
.ok();
});
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
Some(recv.into_stream().boxed())
Some(recv.boxed())
}

fn publish(&self, info: &AddrInfo) {
let discovery_sender = self.sender.clone();
let info = info.clone();
tokio::spawn(async move {
discovery_sender
.send_async(Message::ChangeLocalAddrs(info))
.send(Message::ChangeLocalAddrs(info))
.await
.ok();
});
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub(crate) struct MagicSock {
proxy_url: Option<Url>,

/// Used for receiving relay messages.
relay_recv_receiver: flume::Receiver<RelayRecvResult>,
relay_recv_receiver: async_channel::Receiver<RelayRecvResult>,
/// Stores wakers, to be called when relay_recv_ch receives new data.
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
network_send_wakers: parking_lot::Mutex<Option<Waker>>,
Expand Down Expand Up @@ -786,11 +786,11 @@ impl MagicSock {
break;
}
match self.relay_recv_receiver.try_recv() {
Err(flume::TryRecvError::Empty) => {
Err(async_channel::TryRecvError::Empty) => {
self.network_recv_wakers.lock().replace(cx.waker().clone());
break;
}
Err(flume::TryRecvError::Disconnected) => {
Err(async_channel::TryRecvError::Closed) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
Expand Down Expand Up @@ -1375,7 +1375,7 @@ impl Handle {
insecure_skip_relay_cert_verify,
} = opts;

let (relay_recv_sender, relay_recv_receiver) = flume::bounded(128);
let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128);

let (pconn4, pconn6) = bind(port)?;
let port = pconn4.port();
Expand Down Expand Up @@ -1701,7 +1701,7 @@ struct Actor {
relay_actor_sender: mpsc::Sender<RelayActorMessage>,
relay_actor_cancel_token: CancellationToken,
/// Channel to send received relay messages on, for processing.
relay_recv_sender: flume::Sender<RelayRecvResult>,
relay_recv_sender: async_channel::Sender<RelayRecvResult>,
/// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun.
periodic_re_stun_timer: time::Interval,
/// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc.
Expand Down Expand Up @@ -1855,7 +1855,7 @@ impl Actor {
let passthroughs = self.process_relay_read_result(read_result);
for passthrough in passthroughs {
self.relay_recv_sender
.send_async(passthrough)
.send(passthrough)
.await
.expect("missing recv sender");
let mut wakers = self.msock.network_recv_wakers.lock();
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/magicsock/udp_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ mod tests {
let (m2, _m2_key) = wrap_socket(m2)?;

let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port());
let (m1_send, m1_recv) = flume::bounded(8);
let (m1_send, m1_recv) = async_channel::bounded(8);

let m1_task = tokio::task::spawn(async move {
if let Some(conn) = m1.accept().await {
let conn = conn.await?;
let (mut send_bi, mut recv_bi) = conn.accept_bi().await?;

let val = recv_bi.read_to_end(usize::MAX).await?;
m1_send.send_async(val).await?;
m1_send.send(val).await?;
send_bi.finish().await?;
}

Expand All @@ -220,7 +220,7 @@ mod tests {
drop(send_bi);

// make sure the right values arrived
let val = m1_recv.recv_async().await?;
let val = m1_recv.recv().await?;
assert_eq!(val, b"hello");

m1_task.await??;
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/net/netmon/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(super) struct Actor {
/// OS specific monitor.
#[allow(dead_code)]
route_monitor: RouteMonitor,
mon_receiver: flume::Receiver<NetworkMessage>,
mon_receiver: async_channel::Receiver<NetworkMessage>,
actor_receiver: mpsc::Receiver<ActorMessage>,
actor_sender: mpsc::Sender<ActorMessage>,
/// Callback registry.
Expand All @@ -84,7 +84,7 @@ impl Actor {
let wall_time = Instant::now();

// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
let (mon_sender, mon_receiver) = flume::bounded(MON_CHAN_CAPACITY);
let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY);
let route_monitor = RouteMonitor::new(mon_sender)?;
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);

Expand Down Expand Up @@ -129,7 +129,7 @@ impl Actor {
debounce_interval.reset_immediately();
}
}
Ok(_event) = self.mon_receiver.recv_async() => {
Ok(_event) = self.mon_receiver.recv() => {
trace!("network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/net/netmon/android.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::actor::NetworkMessage;
pub(super) struct RouteMonitor {}

impl RouteMonitor {
pub(super) fn new(_sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(_sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
// Very sad monitor. Android doesn't allow us to do this

Ok(RouteMonitor {})
Expand Down
4 changes: 2 additions & 2 deletions iroh-net/src/net/netmon/bsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Drop for RouteMonitor {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
socket.set_nonblocking(true)?;
let socket_std: std::os::unix::net::UnixStream = socket.into();
Expand All @@ -44,7 +44,7 @@ impl RouteMonitor {
) {
Ok(msgs) => {
if contains_interesting_message(&msgs) {
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
}
Err(err) => {
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/net/netmon/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ macro_rules! get_nla {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
let (mut conn, mut _handle, mut messages) = new_connection()?;

// Specify flags to listen on.
Expand Down Expand Up @@ -87,7 +87,7 @@ impl RouteMonitor {
continue;
} else {
addrs.insert(addr.clone());
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
}
}
Expand All @@ -97,7 +97,7 @@ impl RouteMonitor {
if let Some(addr) = get_nla!(msg, address::Nla::Address) {
addrs.remove(addr);
}
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewRoute(msg) | RtnlMessage::DelRoute(msg) => {
trace!("ROUTE:: {:?}", msg);
Expand All @@ -124,15 +124,15 @@ impl RouteMonitor {
}
}
}
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewRule(msg) => {
trace!("NEWRULE: {:?}", msg);
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::DelRule(msg) => {
trace!("DELRULE: {:?}", msg);
sender.send_async(NetworkMessage::Change).await.ok();
sender.send(NetworkMessage::Change).await.ok();
}
RtnlMessage::NewLink(msg) => {
trace!("NEWLINK: {:?}", msg);
Expand Down
6 changes: 3 additions & 3 deletions iroh-net/src/net/netmon/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ pub(super) struct RouteMonitor {
}

impl RouteMonitor {
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
// Register two callbacks with the windows api
let mut cb_handler = CallbackHandler::default();

// 1. Unicast Address Changes
let s = sender.clone();
cb_handler.register_unicast_address_change_callback(Box::new(move || {
if let Err(err) = s.send(NetworkMessage::Change) {
if let Err(err) = s.send_blocking(NetworkMessage::Change) {
warn!("unable to send: unicast change notification: {:?}", err);
}
}))?;

// 2. Route Changes
cb_handler.register_route_change_callback(Box::new(move || {
if let Err(err) = sender.send(NetworkMessage::Change) {
if let Err(err) = sender.send_blocking(NetworkMessage::Change) {
warn!("unable to send: route change notification: {:?}", err);
}
}))?;
Expand Down
Loading