From 2d04306a518f060bdaa7adfb42630ae01e04e093 Mon Sep 17 00:00:00 2001 From: Kasey Date: Mon, 7 Oct 2024 11:12:01 -0400 Subject: [PATCH] fix(iroh-net): use `try_send` rather than `send` so we dont block the local swarm discovery service (#2794) ## Description Using `subscriber.send().await` could block the entire discovery service if one of the subscribers polls too slow. Change to `try_send` that will drop the discovery item from that stream if it is closed. ## Notes & open questions Added a line in the documentation to mention that if you do not poll enough, you may miss messages. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. --------- Co-authored-by: Diva M --- iroh-net/src/discovery.rs | 2 ++ .../src/discovery/local_swarm_discovery.rs | 22 ++++++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index e3848a7db5..0cbfa285d0 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync { /// until the stream is actually polled. To avoid missing discovered nodes, /// poll the stream as soon as possible. /// + /// If you do not regularly poll the stream, you may miss discovered nodes. + /// /// Any discovery systems that only discover when explicitly resolving a /// specific [`NodeId`] do not need to implement this method. Any nodes or /// addresses that are discovered by calling `resolve` should NOT be added diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index 655b82ab26..a0ce2e941d 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -47,7 +47,10 @@ use watchable::Watchable; use iroh_base::key::PublicKey; use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; -use tokio::{sync::mpsc, task::JoinSet}; +use tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinSet, +}; use tokio_util::task::AbortOnDropHandle; use crate::{ @@ -103,12 +106,21 @@ impl Subscribers { /// Sends the `node_id` and `item` to each subscriber. /// /// Cleans up any subscribers that have been dropped. - async fn send(&mut self, item: DiscoveryItem) { + fn send(&mut self, item: DiscoveryItem) { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped - if (subscriber.send(item.clone()).await).is_err() { - clean_up.push(i); + if let Err(err) = subscriber.try_send(item.clone()) { + match err { + TrySendError::Full(_) => { + warn!( + ?item, + idx = i, + "local swarm discovery subscriber is blocked, dropping item" + ) + } + TrySendError::Closed(_) => clean_up.push(i), + } } } for i in clean_up.into_iter().rev() { @@ -236,7 +248,7 @@ impl LocalSwarmDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(item).await; + subscribers.send(item); } } Message::Resolve(node_id, sender) => {