Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/gossipsub: Inconsistency in mesh peer tracking #2189

Merged
merged 8 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 0.33.0 [unreleased]

- Improve internal peer tracking.
[PR 2175](https://github.com/libp2p/rust-libp2p/pull/2175)

- Update dependencies.

- Allow `message_id_fn`s to accept closures that capture variables.
Expand Down
84 changes: 44 additions & 40 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,15 @@ where

let mut do_px = self.config.do_px();

// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
for topic in &topics {
self.peer_topics
.entry(*peer_id)
.or_default()
.insert(topic.clone());
}

// we don't GRAFT to/from explicit peers; complain loudly if this happens
if self.explicit_peers.contains(peer_id) {
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
Expand Down Expand Up @@ -1283,7 +1292,7 @@ where
peer_score.add_penalty(peer_id, 1);
}
}
//no PX
// no PX
do_px = false;

to_prune_topics.insert(topic_hash.clone());
Expand Down Expand Up @@ -2808,34 +2817,33 @@ where
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
return;
}

debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
} else {
debug!("New peer connected: {}", peer_id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}

if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
*peer_id,
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
}

Expand All @@ -2854,9 +2862,10 @@ where
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
if !self.blacklisted_peers.contains(peer_id) {
debug!("Disconnected node, not in connected nodes");
}
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};
Expand Down Expand Up @@ -2890,12 +2899,12 @@ where
.get_mut(&topic)
.map(|peers| peers.remove(peer_id));
}

//forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
}

// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);

// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
Expand All @@ -2913,11 +2922,6 @@ where
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(peer_id) {
return;
}

// Check if the peer is an outbound peer
if let ConnectedPoint::Dialer { .. } = endpoint {
// Diverging from the go implementation we only want to consider a peer as outbound peer
Expand Down
32 changes: 32 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5228,4 +5228,36 @@ mod tests {
//nobody got penalized
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
}

#[test]
/// Test nodes that send grafts without subscriptions.
fn test_graft_without_subscribe() {
// The node should:
// - Create an empty vector in mesh[topic]
// - Send subscription request to all peers
// - run JOIN(topic)

let topic = String::from("test_subscribe");
let subscribe_topic = vec![topic.clone()];
let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()];
let (mut gs, peers, topic_hashes) = inject_nodes1()
.peer_no(1)
.topics(subscribe_topic)
.to_subscribe(false)
.create_network();

assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);

// The node sends a graft for the subscribe topic.
gs.handle_graft(&peers[0], subscribe_topic_hash);

// The node disconnects
gs.inject_disconnected(&peers[0]);

// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
}
}