From ecf010553859799b295d62d73414548b102b03aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 29 Aug 2024 21:46:23 +0100 Subject: [PATCH 1/5] simplify logic by moving subscribed topics to PeerConnections. --- protocols/gossipsub/src/behaviour.rs | 367 ++++++++------------- protocols/gossipsub/src/behaviour/tests.rs | 97 +++--- protocols/gossipsub/src/metrics.rs | 15 +- protocols/gossipsub/src/types.rs | 4 +- 4 files changed, 202 insertions(+), 281 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0d1af1ada0c..bab967123b0 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -259,12 +259,6 @@ pub struct Behaviour { /// the set of [`ConnectionId`]s. connected_peers: HashMap, - /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. - topic_peers: HashMap>, - - /// A map of all connected peers to their subscribed topics. - peer_topics: HashMap>, - /// A set of all explicit peers. These are peers that remain connected and we unconditionally /// forward messages to, outside of the scoring system. explicit_peers: HashSet, @@ -443,8 +437,6 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - topic_peers: HashMap::new(), - peer_topics: HashMap::new(), explicit_peers: HashSet::new(), blacklisted_peers: HashSet::new(), mesh: HashMap::new(), @@ -501,9 +493,9 @@ where /// Lists all known peers and their associated subscribed topics. pub fn all_peers(&self) -> impl Iterator)> { - self.peer_topics + self.connected_peers .iter() - .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect())) + .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect())) } /// Lists all known peers and their associated protocol. @@ -535,7 +527,7 @@ where } // send subscription request to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); let event = RpcOut::Subscribe(topic_hash.clone()); self.send_message(peer, event); @@ -563,7 +555,7 @@ where } // announce to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); let event = RpcOut::Unsubscribe(topic_hash.clone()); self.send_message(peer, event); @@ -621,84 +613,79 @@ where let topic_hash = raw_message.topic.clone(); + let mut peers_on_topic = self + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(&topic_hash)) + .map(|(peer_id, _)| peer_id) + .peekable(); + + if peers_on_topic.peek().is_none() { + return Err(PublishError::InsufficientPeers); + } + let mut recipient_peers = HashSet::new(); - if let Some(set) = self.topic_peers.get(&topic_hash) { - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend(set.iter().filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 - })); - } else { - match self.mesh.get(&raw_message.topic) { - // Mesh peers - Some(mesh_peers) => { - recipient_peers.extend(mesh_peers); - } - // Gossipsub peers - None => { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| { - pst.publish_threshold - }) - .0 - } - }, - ); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } + if self.config.flood_publish() { + // Forward to all peers above score and all explicit peers + recipient_peers.extend(peers_on_topic.filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + })); + } else { + match self.mesh.get(&topic_hash) { + // Mesh peers + Some(mesh_peers) => { + recipient_peers.extend(mesh_peers); + } + // Gossipsub peers + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + // If we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = + get_random_peers(&self.connected_peers, &topic_hash, mesh_n, { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |pst| pst.publish_threshold) + .0 + } + }); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + tracing::debug!(%peer, "Peer added to fanout"); + recipient_peers.insert(peer); } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); } + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); } + } - // Explicit peers - for peer in &self.explicit_peers { - if set.contains(peer) { - recipient_peers.insert(*peer); - } - } + // Explicit peers that are part of the topic + recipient_peers + .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id))); - // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && !self - .score_below_threshold(peer, |ts| ts.publish_threshold) - .0 - { - recipient_peers.insert(*peer); - } + // Floodsub peers + for (peer, connections) in &self.connected_peers { + if connections.kind == PeerKind::Floodsub + && !self + .score_below_threshold(peer, |ts| ts.publish_threshold) + .0 + { + recipient_peers.insert(*peer); } } } - if recipient_peers.is_empty() { - return Err(PublishError::InsufficientPeers); - } - // If the message isn't a duplicate and we have sent it to some peers add it to the // duplicate cache and memcache. self.duplicate_cache.insert(msg_id.clone()); @@ -964,7 +951,6 @@ where if added_peers.len() < self.config.mesh_n() { // get the peers let new_peers = get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.mesh_n() - added_peers.len(), @@ -1009,7 +995,6 @@ where peer_id, vec![topic_hash], &self.mesh, - self.peer_topics.get(&peer_id), &mut self.events, &self.connected_peers, ); @@ -1056,7 +1041,6 @@ where // Select peers for peer exchange let peers = if do_px { get_random_peers( - &self.topic_peers, &self.connected_peers, topic_hash, self.config.prune_peers(), @@ -1107,7 +1091,6 @@ where peer, topic_hash, &self.mesh, - self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); @@ -1118,7 +1101,7 @@ where /// Checks if the given peer is still connected and if not dials the peer again. fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) { - if !self.peer_topics.contains_key(peer_id) { + if !self.connected_peers.contains_key(peer_id) { // Connect to peer tracing::debug!(peer=%peer_id, "Connecting to explicit peer"); self.events.push_back(ToSwarm::Dial { @@ -1329,17 +1312,18 @@ where let mut do_px = self.config.do_px(); + let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft"); + return; + }; + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { - self.peer_topics - .entry(*peer_id) - .or_default() - .insert(topic.clone()); - self.topic_peers - .entry(topic.clone()) - .or_default() - .insert(*peer_id); + connected_peer.topics.insert(topic.clone()); + if let Some(m) = self.metrics.as_mut() { + m.inc_topic_peers(topic); + } } // we don't GRAFT to/from explicit peers; complain loudly if this happens @@ -1441,7 +1425,6 @@ where *peer_id, vec![&topic_hash], &self.mesh, - self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); @@ -1514,7 +1497,6 @@ where *peer_id, topic_hash, &self.mesh, - self.peer_topics.get(peer_id), &mut self.events, &self.connected_peers, ); @@ -1825,7 +1807,7 @@ where let mut unsubscribed_peers = Vec::new(); - let Some(subscribed_topics) = self.peer_topics.get_mut(propagation_source) else { + let Some(peer) = self.connected_peers.get_mut(propagation_source) else { tracing::error!( peer=%propagation_source, "Subscription by unknown peer" @@ -1841,7 +1823,7 @@ where let filtered_topics = match self .subscription_filter - .filter_incoming_subscriptions(subscriptions, subscribed_topics) + .filter_incoming_subscriptions(subscriptions, &peer.topics) { Ok(topics) => topics, Err(s) => { @@ -1857,11 +1839,10 @@ where for subscription in filtered_topics { // get the peers from the mapping, or insert empty lists if the topic doesn't exist let topic_hash = &subscription.topic_hash; - let peer_list = self.topic_peers.entry(topic_hash.clone()).or_default(); match subscription.action { SubscriptionAction::Subscribe => { - if peer_list.insert(*propagation_source) { + if peer.topics.insert(topic_hash.clone()) { tracing::debug!( peer=%propagation_source, topic=%topic_hash, @@ -1869,17 +1850,9 @@ where ); } - // add to the peer_topics mapping - subscribed_topics.insert(topic_hash.clone()); - // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) - && matches!( - self.connected_peers - .get(propagation_source) - .map(|v| &v.kind), - Some(PeerKind::Gossipsubv1_1) | Some(PeerKind::Gossipsub) - ) + && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub) && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, @@ -1922,16 +1895,18 @@ where })); } SubscriptionAction::Unsubscribe => { - if peer_list.remove(propagation_source) { + if peer.topics.remove(topic_hash) { tracing::debug!( peer=%propagation_source, topic=%topic_hash, "SUBSCRIPTION: Removing gossip peer from topic" ); + + if let Some(m) = self.metrics.as_mut() { + m.dec_topic_peers(topic_hash); + } } - // remove topic from the peer_topics mapping - subscribed_topics.remove(topic_hash); unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { @@ -1940,10 +1915,6 @@ where })); } } - - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic_hash, peer_list.len()); - } } // remove unsubscribed peers from the mesh if it exists @@ -1958,7 +1929,6 @@ where *propagation_source, topics_joined, &self.mesh, - self.peer_topics.get(propagation_source), &mut self.events, &self.connected_peers, ); @@ -2039,7 +2009,6 @@ where for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; - let topic_peers = &self.topic_peers; let outbound_peers = &self.outbound_peers; // drop all peers with negative score, without PX @@ -2087,18 +2056,13 @@ where ); // not enough peers - get mesh_n - current_length more let desired_peers = self.config.mesh_n() - peers.len(); - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - desired_peers, - |peer| { + let peer_list = + get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 - }, - ); + }); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2180,19 +2144,14 @@ where // if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; - let peer_list = get_random_peers( - topic_peers, - &self.connected_peers, - topic_hash, - needed, - |peer| { + let peer_list = + get_random_peers(&self.connected_peers, topic_hash, needed, |peer| { !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) && *scores.get(peer).unwrap_or(&0.0) >= 0.0 && outbound_peers.contains(peer) - }, - ); + }); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2249,7 +2208,6 @@ where // GRAFT if median < thresholds.opportunistic_graft_threshold { let peer_list = get_random_peers( - topic_peers, &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), @@ -2308,22 +2266,22 @@ where Some((_, thresholds, _, _)) => thresholds.publish_threshold, _ => 0.0, }; - for peer in peers.iter() { + for peer_id in peers.iter() { // is the peer still subscribed to the topic? - let peer_score = *scores.get(peer).unwrap_or(&0.0); - match self.peer_topics.get(peer) { - Some(topics) => { - if !topics.contains(topic_hash) || peer_score < publish_threshold { + let peer_score = *scores.get(peer_id).unwrap_or(&0.0); + match self.connected_peers.get(peer_id) { + Some(peer) => { + if !peer.topics.contains(topic_hash) || peer_score < publish_threshold { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Peer removed from fanout for topic" ); - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } None => { // remove if the peer has disconnected - to_remove_peers.push(*peer); + to_remove_peers.push(*peer_id); } } } @@ -2340,17 +2298,12 @@ where ); let needed_peers = self.config.mesh_n() - peers.len(); let explicit_peers = &self.explicit_peers; - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - topic_hash, - needed_peers, - |peer_id| { + let new_peers = + get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold - }, - ); + }); peers.extend(new_peers); } } @@ -2432,17 +2385,12 @@ where ) }; // get gossip_lazy random peers - let to_msg_peers = get_random_peers_dynamic( - &self.topic_peers, - &self.connected_peers, - topic_hash, - n_map, - |peer| { + let to_msg_peers = + get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| { !peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0 - }, - ); + }); tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len()); @@ -2492,7 +2440,6 @@ where peer, vec![topic], &self.mesh, - self.peer_topics.get(&peer), &mut self.events, &self.connected_peers, ); @@ -2543,7 +2490,6 @@ where *peer, topic_hash, &self.mesh, - self.peer_topics.get(peer), &mut self.events, &self.connected_peers, ); @@ -2577,11 +2523,11 @@ where // Add explicit peers for peer_id in &self.explicit_peers { - if let Some(topics) = self.peer_topics.get(peer_id) { + if let Some(peer) = self.connected_peers.get(peer_id) { if Some(peer_id) != propagation_source && !originating_peers.contains(peer_id) && Some(peer_id) != message.source.as_ref() - && topics.contains(&message.topic) + && peer.topics.contains(&message.topic) { recipient_peers.insert(*peer_id); } @@ -2790,6 +2736,7 @@ where .or_insert(PeerConnections { kind: PeerKind::Floodsub, connections: vec![], + topics: Default::default(), }) .connections .push(connection_id); @@ -2798,9 +2745,6 @@ where return; // Not our first connection to this peer, hence nothing to do. } - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(peer_id, Default::default()); - if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_peer(peer_id); } @@ -2843,28 +2787,26 @@ where if remaining_established != 0 { // Remove the connection from the list - if let Some(connections) = self.connected_peers.get_mut(&peer_id) { - let index = connections + if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + let index = peer .connections .iter() .position(|v| v == &connection_id) .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); + peer.connections.remove(index); // If there are more connections and this peer is in a mesh, inform the first connection // handler. - if !connections.connections.is_empty() { - if let Some(topics) = self.peer_topics.get(&peer_id) { - for topic in topics { - if let Some(mesh_peers) = self.mesh.get(topic) { - if mesh_peers.contains(&peer_id) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connections.connections[0]), - }); - break; - } + if !peer.connections.is_empty() { + for topic in &peer.topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(&peer_id) { + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::JoinedMesh, + handler: NotifyHandler::One(peer.connections[0]), + }); + break; } } } @@ -2874,7 +2816,7 @@ where // remove from mesh, topic_peers, peer_topic and the fanout tracing::debug!(peer=%peer_id, "Peer disconnected"); { - let Some(topics) = self.peer_topics.get(&peer_id) else { + let Some(peer) = self.connected_peers.get(&peer_id) else { debug_assert!( self.blacklisted_peers.contains(&peer_id), "Disconnected node not in connected list" @@ -2883,7 +2825,7 @@ where }; // remove peer from all mappings - for topic in topics { + for topic in peer.topics.iter() { // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it @@ -2895,24 +2837,8 @@ where }; } - // remove from topic_peers - if let Some(peer_list) = self.topic_peers.get_mut(topic) { - if !peer_list.remove(&peer_id) { - // debugging purposes - tracing::warn!( - peer=%peer_id, - "Disconnected node: peer not in topic_peers" - ); - } - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic, peer_list.len()) - } - } else { - tracing::warn!( - peer=%peer_id, - topic=%topic, - "Disconnected node: peer with topic not in topic_peers" - ); + if let Some(m) = self.metrics.as_mut() { + m.dec_topic_peers(topic); } // remove from fanout @@ -2926,11 +2852,6 @@ where self.px_peers.remove(&peer_id); self.outbound_peers.remove(&peer_id); - // Remove peer from peer_topics and connected_peers - // NOTE: It is possible the peer has already been removed from all mappings if it does not - // support the protocol. - self.peer_topics.remove(&peer_id); - // If metrics are enabled, register the disconnection of a peer based on its protocol. if let Some(metrics) = self.metrics.as_mut() { let peer_kind = &self @@ -3190,7 +3111,6 @@ fn peer_added_to_mesh( peer_id: PeerId, new_topics: Vec<&TopicHash>, mesh: &HashMap>, - known_topics: Option<&BTreeSet>, events: &mut VecDeque>, connections: &HashMap, ) { @@ -3204,8 +3124,8 @@ fn peer_added_to_mesh( conn.connections[0] }; - if let Some(topics) = known_topics { - for topic in topics { + if let Some(peer) = connections.get(&peer_id) { + for topic in &peer.topics { if !new_topics.contains(&topic) { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { @@ -3231,7 +3151,6 @@ fn peer_removed_from_mesh( peer_id: PeerId, old_topic: &TopicHash, mesh: &HashMap>, - known_topics: Option<&BTreeSet>, events: &mut VecDeque>, connections: &HashMap, ) { @@ -3243,8 +3162,8 @@ fn peer_removed_from_mesh( .first() .expect("There should be at least one connection to a peer."); - if let Some(topics) = known_topics { - for topic in topics { + if let Some(peer) = connections.get(&peer_id) { + for topic in &peer.topics { if topic != old_topic { if let Some(mesh_peers) = mesh.get(topic) { if mesh_peers.contains(&peer_id) { @@ -3267,28 +3186,19 @@ fn peer_removed_from_mesh( /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. fn get_random_peers_dynamic( - topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, mut f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - let mut gossip_peers = match topic_peers.get(topic_hash) { - // if they exist, filter the peers by `f` - Some(peer_list) => peer_list - .iter() - .copied() - .filter(|p| { - f(p) && match connected_peers.get(p) { - Some(connections) if connections.kind == PeerKind::Gossipsub => true, - Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true, - _ => false, - } - }) - .collect(), - None => Vec::new(), - }; + let mut gossip_peers = connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(topic_hash)) + .filter(|(peer_id, _)| f(peer_id)) + .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1) + .map(|(peer_id, _)| *peer_id) + .collect::>(); // if we have less than needed, return them let n = n_map(gossip_peers.len()); @@ -3309,13 +3219,12 @@ fn get_random_peers_dynamic( /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. fn get_random_peers( - topic_peers: &HashMap>, connected_peers: &HashMap, topic_hash: &TopicHash, n: usize, f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f) + get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the @@ -3355,8 +3264,6 @@ impl fmt::Debug for Behaviour>(), "First peer should be subscribed to three topics" ); - let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); + let peer1 = gs.connected_peers.get(&peers[1]).unwrap(); assert!( - peer_topics == topic_hashes.iter().take(3).cloned().collect(), + peer1.topics + == topic_hashes + .iter() + .take(3) + .cloned() + .collect::>(), "Second peer should be subscribed to three topics" ); assert!( - !gs.peer_topics.contains_key(&unknown_peer), + !gs.connected_peers.contains_key(&unknown_peer), "Unknown peer should not have been added" ); for topic_hash in topic_hashes[..3].iter() { - let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); + let topic_peers = gs + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(topic_hash)) + .map(|(peer_id, _)| *peer_id) + .collect::>(); assert!( topic_peers == peers[..2].iter().cloned().collect(), "Two peers should be added to the first three topics" @@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() { &peers[0], ); - let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); - assert!( - peer_topics == topic_hashes[1..3].iter().cloned().collect(), + let peer = gs.connected_peers.get(&peers[0]).unwrap().clone(); + assert_eq!( + peer.topics, + topic_hashes[1..3].iter().cloned().collect::>(), "Peer should be subscribed to two topics" ); - let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment + // only gossipsub at the moment + let topic_peers = gs + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(&topic_hashes[0])) + .map(|(peer_id, _)| *peer_id) + .collect::>(); + assert!( topic_peers == peers[1..2].iter().cloned().collect(), "Only the second peers should be in the first topic" @@ -924,9 +952,8 @@ fn test_get_random_peers() { for _ in 0..20 { peers.push(PeerId::random()) } - - gs.topic_peers - .insert(topic_hash.clone(), peers.iter().cloned().collect()); + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); gs.connected_peers = peers .iter() @@ -936,52 +963,32 @@ fn test_get_random_peers() { PeerConnections { kind: PeerKind::Gossipsubv1_1, connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), }, ) }) .collect(); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 30, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = get_random_peers( - &gs.topic_peers, - &gs.connected_peers, - &topic_hash, - 20, - |_| true, - ); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| { - true - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); // test the filter - let random_peers = - get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { - false - }); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); - let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, { + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, { |peer| peers.contains(peer) }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index e044ca67e71..7d4acada3c7 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -355,12 +355,17 @@ impl Metrics { } } - /// Register how many peers do we known are subscribed to this topic. - pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) { + /// Increase the number of peers that are subscribed to this topic. + pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { - self.topic_peers_count - .get_or_create(topic) - .set(count as i64); + self.topic_peers_count.get_or_create(topic).inc(); + } + } + + /// Decrease the number of peers that are subscribed to this topic. + pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.topic_peers_count.get_or_create(topic).dec(); } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d1b92ff0ba8..a88f4822ac2 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -24,8 +24,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; -use std::fmt; use std::fmt::Debug; +use std::{collections::BTreeSet, fmt}; use crate::rpc_proto::proto; #[cfg(feature = "serde")] @@ -77,6 +77,8 @@ pub(crate) struct PeerConnections { pub(crate) kind: PeerKind, /// Its current connections. pub(crate) connections: Vec, + /// Subscribed topics. + pub(crate) topics: BTreeSet, } /// Describes the types of peers that can exist in the gossipsub context. From c1a925e5312e422f345fd20ee00cfa4420d84159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 29 Aug 2024 21:58:19 +0100 Subject: [PATCH 2/5] publish to at least mesh_n peers --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/gossipsub/CHANGELOG.md | 5 ++++ protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/behaviour.rs | 29 +++++++++++++++++++--- protocols/gossipsub/src/behaviour/tests.rs | 4 +-- 6 files changed, 36 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70744c1bf40..563e5a23bfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2838,7 +2838,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.47.0" +version = "0.47.1" dependencies = [ "async-std", "asynchronous-codec", diff --git a/Cargo.toml b/Cargo.toml index 257c07c7c98..b9ec192d8e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" } libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.42.0", path = "transports/dns" } libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" } -libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" } +libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.45.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } libp2p-kad = { version = "0.46.0", path = "protocols/kad" } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 8e115052d31..96175813b24 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,10 @@ ## 0.47.0 +- Attempt to publish to at least mesh_n peers when flood publish is disabled. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). + +## 0.47.0 + - Add ConnectionError to FromSwarm::ConnectionClosed. See [PR 5485](https://github.com/libp2p/rust-libp2p/pull/5485). diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 4cb590bed0c..665f757fcb3 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = { workspace = true } description = "Gossipsub protocol for libp2p" -version = "0.47.0" +version = "0.47.1" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index bab967123b0..be6abf31724 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -635,6 +635,30 @@ where match self.mesh.get(&topic_hash) { // Mesh peers Some(mesh_peers) => { + // We have a mesh set. We want to make sure to publish to at least `mesh_n` + // peers (if possible). + let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len()); + + if needed_extra_peers > 0 { + // We don't have `mesh_n` peers in our mesh, we will randomly select extras + // and publish to them. + + // Get a random set of peers that are appropriate to send messages too. + let peer_list = get_random_peers( + &self.connected_peers, + &topic_hash, + needed_extra_peers, + |peer| { + !mesh_peers.contains(peer) + && !self.explicit_peers.contains(peer) + && !self + .score_below_threshold(peer, |pst| pst.publish_threshold) + .0 + }, + ); + recipient_peers.extend(peer_list); + } + recipient_peers.extend(mesh_peers); } // Gossipsub peers @@ -2118,10 +2142,9 @@ where if outbound <= self.config.mesh_outbound_min() { // do not remove anymore outbound peers continue; - } else { - // an outbound peer gets removed - outbound -= 1; } + // an outbound peer gets removed + outbound -= 1; } // remove the peer diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 3c5e2ede586..a74566a1308 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -674,8 +674,8 @@ fn test_publish_without_flood_publishing() { let config: Config = Config::default(); assert_eq!( publishes.len(), - config.mesh_n_low(), - "Should send a publish message to all known peers" + config.mesh_n(), + "Should send a publish message to at least mesh_n peers" ); assert!( From c373e010bd5ffd78c16aa9d9bc3d8f3450b38b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 2 Sep 2024 15:43:56 +0100 Subject: [PATCH 3/5] address Akihito's review --- protocols/gossipsub/src/behaviour.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index be6abf31724..16adb555a44 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1344,9 +1344,10 @@ where // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. for topic in &topics { - connected_peer.topics.insert(topic.clone()); - if let Some(m) = self.metrics.as_mut() { - m.inc_topic_peers(topic); + if connected_peer.topics.insert(topic.clone()) { + if let Some(m) = self.metrics.as_mut() { + m.inc_topic_peers(topic); + } } } @@ -1872,6 +1873,10 @@ where topic=%topic_hash, "SUBSCRIPTION: Adding gossip peer to topic" ); + + if let Some(m) = self.metrics.as_mut() { + m.inc_topic_peers(topic_hash); + } } // if the mesh needs peers add the peer to the mesh From d022f1e684aa3f073f72d954f35eac498a5d561d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 3 Sep 2024 17:38:49 +0100 Subject: [PATCH 4/5] Update protocols/gossipsub/CHANGELOG.md Co-authored-by: Darius Clark --- protocols/gossipsub/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 96175813b24..f485c000c52 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.47.0 +## 0.47.1 - Attempt to publish to at least mesh_n peers when flood publish is disabled. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). From 648161f96123489cec15794e5a50939ce7d333d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 3 Sep 2024 17:38:55 +0100 Subject: [PATCH 5/5] Update protocols/gossipsub/CHANGELOG.md Co-authored-by: Darius Clark --- protocols/gossipsub/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index f485c000c52..c47a9f40f66 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.47.1 - Attempt to publish to at least mesh_n peers when flood publish is disabled. - See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). + See [PR 5578](https://github.com/libp2p/rust-libp2p/pull/5578). ## 0.47.0