From 448b5ff81068290e1e57d4b1c57a17c613932830 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 2 Aug 2021 17:11:56 +1000 Subject: [PATCH 1/5] Handle peer mapping inconsistencies --- protocols/gossipsub/src/behaviour.rs | 84 ++++++++++++---------- protocols/gossipsub/src/behaviour/tests.rs | 32 +++++++++ protocols/gossipsub/src/protocol.rs | 2 +- protocols/gossipsub/tests/smoke.rs | 12 +++- 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8a9c1b9efe0..a0b06ae2263 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1241,6 +1241,15 @@ where let mut do_px = self.config.do_px(); + // For each topic, if a peer has grafted us, then we necessarily must be in their mesh + // and they must be subscribed to the topic. Ensure we have recorded the mapping. + for topic in &topics { + self.peer_topics + .entry(*peer_id) + .or_default() + .insert(topic.clone()); + } + // we don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { warn!("GRAFT: ignoring request from direct peer {}", peer_id); @@ -1283,7 +1292,7 @@ where peer_score.add_penalty(peer_id, 1); } } - //no PX + // no PX do_px = false; to_prune_topics.insert(topic_hash.clone()); @@ -2808,34 +2817,33 @@ where // Ignore connections from blacklisted peers. if self.blacklisted_peers.contains(peer_id) { debug!("Ignoring connection from blacklisted peer: {}", peer_id); - return; - } - - debug!("New peer connected: {}", peer_id); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }); - } + } else { + debug!("New peer connected: {}", peer_id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - *peer_id, - GossipsubRpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - error!("Failed to send subscriptions, message too large"); + if !subscriptions.is_empty() { + // send our subscriptions to the peer + if self + .send_message( + *peer_id, + GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + } + .into_protobuf(), + ) + .is_err() + { + error!("Failed to send subscriptions, message too large"); + } } } @@ -2855,7 +2863,7 @@ where Some(topics) => (topics), None => { if !self.blacklisted_peers.contains(peer_id) { - debug!("Disconnected node, not in connected nodes"); + error!("Disconnected node, not in connected nodes"); } return; } @@ -2890,12 +2898,12 @@ where .get_mut(&topic) .map(|peers| peers.remove(peer_id)); } - - //forget px and outbound status for this peer - self.px_peers.remove(peer_id); - self.outbound_peers.remove(peer_id); } + // Forget px and outbound status for this peer + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); + // Remove peer from peer_topics and connected_peers // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. @@ -3198,9 +3206,13 @@ where NetworkBehaviourAction::ReportObservedAddr { address, score } => { NetworkBehaviourAction::ReportObservedAddr { address, score } } - NetworkBehaviourAction::CloseConnection { peer_id, connection } => { - NetworkBehaviourAction::CloseConnection { peer_id, connection } - } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }, }); } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 81b2267fdbb..463452b0cb6 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -5228,4 +5228,36 @@ mod tests { //nobody got penalized assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); } + + #[test] + /// Test nodes that send grafts without subscriptions. + fn test_graft_without_subscribe() { + // The node should: + // - Create an empty vector in mesh[topic] + // - Send subscription request to all peers + // - run JOIN(topic) + + let topic = String::from("test_subscribe"); + let subscribe_topic = vec![topic.clone()]; + let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; + let (mut gs, peers, topic_hashes) = inject_nodes1() + .peer_no(1) + .topics(subscribe_topic) + .to_subscribe(false) + .create_network(); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + + // The node sends a graft for the subscribe topic. + gs.handle_graft(&peers[0], subscribe_topic_hash); + + // The node disconnects + gs.inject_disconnected(&peers[0]); + + // We unsubscribe from the topic. + let _ = gs.unsubscribe(&Topic::new(topic)); + } } diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 19293f58d7e..199d210452a 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -27,12 +27,12 @@ use crate::types::{ GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction, MessageId, PeerInfo, PeerKind, RawGossipsubMessage, }; +use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; use bytes::BytesMut; use futures::future; use futures::prelude::*; -use asynchronous_codec::{Decoder, Encoder, Framed}; use libp2p_core::{ identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, }; diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 841929a7f0c..3cf3f882427 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -51,11 +51,13 @@ impl Future for Graph { for (addr, node) in &mut self.nodes { loop { match node.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::Behaviour(event))) => return Poll::Ready((addr.clone(), event)), + Poll::Ready(Some(SwarmEvent::Behaviour(event))) => { + return Poll::Ready((addr.clone(), event)) + } Poll::Ready(Some(_)) => {} Poll::Ready(None) => panic!("unexpected None when polling nodes"), Poll::Pending => break, - } + } } } @@ -226,7 +228,11 @@ fn multi_hop_propagation() { graph = graph.drain_poll(); // Publish a single message. - graph.nodes[0].1.behaviour_mut().publish(topic, vec![1, 2, 3]).unwrap(); + graph.nodes[0] + .1 + .behaviour_mut() + .publish(topic, vec![1, 2, 3]) + .unwrap(); // Wait for all nodes to receive the published message. let mut received_msgs = 0; From 75565eaae175f7e3f8ebd21b9aaa31ca2f6a55e3 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 2 Aug 2021 17:21:11 +1000 Subject: [PATCH 2/5] Improve blacklisted peer handling --- protocols/gossipsub/src/behaviour.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index a0b06ae2263..8e0b62a1f31 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2921,11 +2921,6 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, ) { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(peer_id) { - return; - } - // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { // Diverging from the go implementation we only want to consider a peer as outbound peer From 7de6a85365b3a984755f0134f049b62eeb3172e0 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 2 Aug 2021 17:36:10 +1000 Subject: [PATCH 3/5] Update changelog --- protocols/gossipsub/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index f0d1d7a5e51..9227a8ebeb6 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,7 +1,11 @@ # 0.33.0 [unreleased] +- Improve internal peer tracking (see [PR 2175]). + - Update dependencies. +[PR 2175]: https://github.com/libp2p/rust-libp2p/pull/2175 + # 0.32.0 [2021-07-12] - Update dependencies. From b9aee746d68a9d181d49cd6720922ab2be9fc183 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 5 Aug 2021 08:44:34 +1000 Subject: [PATCH 4/5] Add review suggestion --- protocols/gossipsub/src/behaviour.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8e0b62a1f31..e8b49264711 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2864,6 +2864,7 @@ where None => { if !self.blacklisted_peers.contains(peer_id) { error!("Disconnected node, not in connected nodes"); + debug_assert!("All peers should exist in peer_topics"); } return; } From 35eaa10eeacabea3239bfd7216a2f0c836efb719 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 5 Aug 2021 13:20:32 +1000 Subject: [PATCH 5/5] Correct debug assert --- protocols/gossipsub/src/behaviour.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index e8b49264711..803f79c924b 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2862,10 +2862,10 @@ where let topics = match self.peer_topics.get(peer_id) { Some(topics) => (topics), None => { - if !self.blacklisted_peers.contains(peer_id) { - error!("Disconnected node, not in connected nodes"); - debug_assert!("All peers should exist in peer_topics"); - } + debug_assert!( + self.blacklisted_peers.contains(peer_id), + "Disconnected node not in connected list" + ); return; } };