Skip to content

Commit

Permalink
update to use futures::channel and connected_peers
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Dec 9, 2023
1 parent b08a4eb commit da9008b
Show file tree
Hide file tree
Showing 5 changed files with 723 additions and 575 deletions.
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async-channel = "1.9.0"
prometheus-client = { workspace = true }

[dev-dependencies]
async-std = { version = "1.6.3", features = ["unstable"] }
async-std = { version = "1.6.3", features = ["unstable", "attributes"] }
hex = "0.4.2"
libp2p-core = { workspace = true }
libp2p-yamux = { workspace = true }
Expand Down
154 changes: 88 additions & 66 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
time::Duration,
};

use futures::channel::mpsc::channel;
use futures::StreamExt;
use futures_ticker::Ticker;
use prometheus_client::registry::Registry;
Expand All @@ -45,7 +46,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::config::{Config, ValidationMode};
use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
Expand All @@ -62,6 +62,10 @@ use crate::types::{
};
use crate::types::{PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{
config::{Config, ValidationMode},
types::RpcReceiver,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -541,7 +545,7 @@ where
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let sender = self
.handler_send_queues
.connected_peers
.get_mut(peer)
.expect("Peerid should exist");

Expand Down Expand Up @@ -572,7 +576,7 @@ where
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let sender = self
.handler_send_queues
.connected_peers
.get_mut(peer)
.expect("Peerid should exist");

Expand Down Expand Up @@ -723,22 +727,19 @@ where
}

// Send to peers we know are subscribed to the topic.
let mut errors = 0;
let mut publish_failed = true;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
let sender = self
.handler_send_queues
.connected_peers
.get_mut(peer_id)
.expect("Peerid should exist");

if sender
publish_failed &= sender
.publish(raw_message.clone(), self.metrics.as_mut())
.is_err()
{
errors += 1;
}
.is_err();
}
if errors == recipient_peers.len() {
if publish_failed {
return Err(PublishError::InsufficientPeers);
}

Expand Down Expand Up @@ -1338,7 +1339,7 @@ where
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
let sender = self
.handler_send_queues
.connected_peers
.get_mut(peer_id)
.expect("Peerid should exist");

Expand Down Expand Up @@ -1496,17 +1497,17 @@ where
// build the prune messages to send
let on_unsubscribe = false;
let mut sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist")
.clone();
.connected_peers
.remove(peer_id)
.expect("Peerid should exist");

for action in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(action);
}
self.connected_peers.insert(*peer_id, sender);
// Send the prune messages to the peer
tracing::debug!(
peer=%peer_id,
Expand Down Expand Up @@ -2001,7 +2002,7 @@ where
// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
let sender = self
.handler_send_queues
.connected_peers
.get_mut(propagation_source)
.expect("Peerid should exist");

Expand Down Expand Up @@ -2546,7 +2547,7 @@ where

// send the control messages
let mut sender = self
.handler_send_queues
.connected_peers
.get_mut(&peer)
.expect("Peerid should exist")
.clone();
Expand Down Expand Up @@ -2581,7 +2582,7 @@ where
false,
);
let mut sender = self
.handler_send_queues
.connected_peers
.get_mut(peer)
.expect("Peerid should exist")
.clone();
Expand Down Expand Up @@ -2657,7 +2658,7 @@ where
for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
let sender = self
.handler_send_queues
.connected_peers
.get_mut(peer)
.expect("Peerid should exist");
sender.forward(message.clone(), self.metrics.as_mut());
Expand Down Expand Up @@ -2775,7 +2776,7 @@ where
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
let sender = self
.handler_send_queues
.connected_peers
.get_mut(&peer)
.expect("Peerid should exist");

Expand All @@ -2791,7 +2792,7 @@ where
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
connection_id: _,
endpoint,
other_established,
..
Expand Down Expand Up @@ -2819,20 +2820,6 @@ where
}
}

// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
self.connected_peers
.entry(peer_id)
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
})
.connections
.push(connection_id);

if other_established > 0 {
return; // Not our first connection to this peer, hence nothing to do.
}
Expand All @@ -2853,7 +2840,7 @@ where
tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let mut sender = self
.handler_send_queues
.connected_peers
.get_mut(&peer_id)
.expect("Peerid should exist")
.clone();
Expand Down Expand Up @@ -2889,24 +2876,19 @@ where
if remaining_established != 0 {
// Remove the connection from the list
if let Some(connections) = self.connected_peers.get_mut(&peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == &connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
connections.connections.remove(&connection_id);

// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(alternative_connection_id) = connections.connections.keys().next() {
if let Some(topics) = self.peer_topics.get(&peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(&peer_id) {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connections.connections[0]),
handler: NotifyHandler::One(*alternative_connection_id),
});
break;
}
Expand Down Expand Up @@ -3046,36 +3028,72 @@ where

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
connection_id: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
let (priority_sender, priority_receiver) =
channel(self.config.connection_handler_queue_len());
let (non_priority_sender, non_priority_receiver) =
channel(self.config.connection_handler_queue_len());
let sender = RpcSender {
priority: priority_sender,
non_priority: non_priority_sender,
};
let receiver = RpcReceiver {
priority: priority_receiver.peekable(),
non_priority: non_priority_receiver.peekable(),
};
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
let peer_info =
self.connected_peers
.entry(peer_id)
.or_insert_with_key(|_| PeerConnections {
kind: PeerKind::Floodsub,
connections: Default::default(),
});
peer_info.connections.insert(connection_id, sender);
Ok(Handler::new(self.config.protocol_config(), receiver))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
connection_id: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
let (priority_sender, priority_receiver) =
channel(self.config.connection_handler_queue_len());
let (non_priority_sender, non_priority_receiver) =
channel(self.config.connection_handler_queue_len());
let sender = RpcSender {
priority: priority_sender,
non_priority: non_priority_sender,
};
let receiver = RpcReceiver {
priority: priority_receiver.peekable(),
non_priority: non_priority_receiver.peekable(),
};
// By default we assume a peer is only a floodsub peer.
//
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
let peer_info =
self.connected_peers
.entry(peer_id)
.or_insert_with_key(|_| PeerConnections {
kind: PeerKind::Floodsub,
connections: Default::default(),
});
peer_info.connections.insert(connection_id, sender);
Ok(Handler::new(self.config.protocol_config(), receiver))
}

fn on_connection_handler_event(
Expand Down Expand Up @@ -3260,7 +3278,10 @@ fn peer_added_to_mesh(
!conn.connections.is_empty(),
"Must have at least one connection"
);
conn.connections[0]
conn.connections
.keys()
.next()
.expect("To be connected to peer")
};

if let Some(topics) = known_topics {
Expand All @@ -3279,7 +3300,7 @@ fn peer_added_to_mesh(
events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id),
handler: NotifyHandler::One(*connection_id),
});
}

Expand All @@ -3299,7 +3320,8 @@ fn peer_removed_from_mesh(
.get(&peer_id)
.expect("To be connected to peer.")
.connections
.first()
.keys()
.next()
.expect("There should be at least one connection to a peer.");

if let Some(topics) = known_topics {
Expand Down
Loading

0 comments on commit da9008b

Please sign in to comment.