Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gossipsub] Inconsistency in mesh peer tracking #2175

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# 0.33.0 [unreleased]

- Improve internal peer tracking (see [PR 2175]).

- Update dependencies.

[PR 2175]: https://github.com/libp2p/rust-libp2p/pull/2175

# 0.32.0 [2021-07-12]

- Update dependencies.
Expand Down
89 changes: 48 additions & 41 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,15 @@ where

let mut do_px = self.config.do_px();

// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
for topic in &topics {
self.peer_topics
.entry(*peer_id)
.or_default()
.insert(topic.clone());
}

// we don't GRAFT to/from explicit peers; complain loudly if this happens
if self.explicit_peers.contains(peer_id) {
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
Expand Down Expand Up @@ -1283,7 +1292,7 @@ where
peer_score.add_penalty(peer_id, 1);
}
}
//no PX
// no PX
do_px = false;

to_prune_topics.insert(topic_hash.clone());
Expand Down Expand Up @@ -2808,34 +2817,33 @@ where
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
return;
}

debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}

if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}

Expand All @@ -2855,7 +2863,7 @@ where
Some(topics) => (topics),
None => {
if !self.blacklisted_peers.contains(peer_id) {
debug!("Disconnected node, not in connected nodes");
error!("Disconnected node, not in connected nodes");
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
}
return;
}
Expand Down Expand Up @@ -2890,12 +2898,12 @@ where
.get_mut(&topic)
.map(|peers| peers.remove(peer_id));
}

//forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
}

// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);

// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
Expand All @@ -2913,11 +2921,6 @@ where
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// Ignore connections from blacklisted peers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on why we no longer ignore blacklisted peers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that in many cases we are under the assumption that any connected peer exists in connected_peers and their topic subscriptions exist in peer_topics. When peers disconnect we use these mappings to correctly remove them from the mesh and other mappings in the behaviour. If there is any inconsistencies, gossipsub panics (this PR fixes on such instance).

Although I haven't seen a panic from blacklisted peers (I'm not using them atm), i'm concerned about inconsistencies coming from states where users at arbitrary times blacklist and unblacklist (via the public functions) peers.

This change represents a safer approach where we register the connections of all peers, add them to the mappings (so they exist regardless of when the user ad-hoc add/removes them) but we still block all messages for whatever peers are blacklisted at the time we receive them. On disconnects, as all peers connections are registered they are removed from all mappings correctly, regardless of the current state of blacklisting.

There probably is a way we could do this without registering the connection, but I think there a quite a few edge cases and this makes it simpler to reason about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AgeManning for the details!

if self.blacklisted_peers.contains(peer_id) {
return;
}

// Check if the peer is an outbound peer
if let ConnectedPoint::Dialer { .. } = endpoint {
// Diverging from the go implementation we only want to consider a peer as outbound peer
Expand Down Expand Up @@ -3198,9 +3201,13 @@ where
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score }
}
NetworkBehaviourAction::CloseConnection { peer_id, connection } => {
NetworkBehaviourAction::CloseConnection { peer_id, connection }
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
},
});
}

Expand Down
32 changes: 32 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5228,4 +5228,36 @@ mod tests {
//nobody got penalized
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
}

#[test]
/// Test nodes that send grafts without subscriptions.
fn test_graft_without_subscribe() {
// The node should:
// - Create an empty vector in mesh[topic]
// - Send subscription request to all peers
// - run JOIN(topic)

let topic = String::from("test_subscribe");
let subscribe_topic = vec![topic.clone()];
let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()];
let (mut gs, peers, topic_hashes) = inject_nodes1()
.peer_no(1)
.topics(subscribe_topic)
.to_subscribe(false)
.create_network();

assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);

// The node sends a graft for the subscribe topic.
gs.handle_graft(&peers[0], subscribe_topic_hash);

// The node disconnects
gs.inject_disconnected(&peers[0]);

// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
}
}
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use crate::types::{
GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo, PeerKind, RawGossipsubMessage,
};
use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use bytes::BytesMut;
use futures::future;
use futures::prelude::*;
use asynchronous_codec::{Decoder, Encoder, Framed};
use libp2p_core::{
identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
};
Expand Down
12 changes: 9 additions & 3 deletions protocols/gossipsub/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ impl Future for Graph {
for (addr, node) in &mut self.nodes {
loop {
match node.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::Behaviour(event))) => return Poll::Ready((addr.clone(), event)),
Poll::Ready(Some(SwarmEvent::Behaviour(event))) => {
return Poll::Ready((addr.clone(), event))
}
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => panic!("unexpected None when polling nodes"),
Poll::Pending => break,
}
}
}
}

Expand Down Expand Up @@ -226,7 +228,11 @@ fn multi_hop_propagation() {
graph = graph.drain_poll();

// Publish a single message.
graph.nodes[0].1.behaviour_mut().publish(topic, vec![1, 2, 3]).unwrap();
graph.nodes[0]
.1
.behaviour_mut()
.publish(topic, vec![1, 2, 3])
.unwrap();

// Wait for all nodes to receive the published message.
let mut received_msgs = 0;
Expand Down