Skip to content

Commit

Permalink
feat(net): enforce trusted_nodes_only setting (paradigmxyz#660)
Browse files Browse the repository at this point in the history
* Add PeerKind and enforce trusted_only setting

* Don't remove trusted peer on graceful close

* Don't remove trusted nodes on PeerCommand::Remove

* Rename PeerKind::NonTrusted to Basic

* Move PeerKind::is_trusted to Peer impl

* Add trusted peer prioritization
  • Loading branch information
lambdaclass-user committed Jan 1, 2023
1 parent 4efb7b9 commit 1f6a943
Showing 1 changed file with 64 additions and 24 deletions.
88 changes: 64 additions & 24 deletions crates/net/network/src/peers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub(crate) struct PeersManager {
/// How long peers to which we could not connect for non-fatal reasons, e.g.
/// [`DisconnectReason::TooManyPeers`], are put in time out.
backoff_duration: Duration,
/// If non-trusted peers should be connected to
connect_trusted_nodes_only: bool,
}

impl PeersManager {
Expand All @@ -106,6 +108,7 @@ impl PeersManager {
ban_duration,
backoff_duration,
trusted_nodes,
connect_trusted_nodes_only,
..
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
Expand All @@ -117,7 +120,7 @@ impl PeersManager {
let mut peers = HashMap::with_capacity(trusted_nodes.len());

for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes {
peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port))));
peers.entry(id).or_insert_with(|| Peer::trusted(SocketAddr::from((address, tcp_port))));
}

Self {
Expand All @@ -135,6 +138,7 @@ impl PeersManager {
ban_list,
ban_duration,
backoff_duration,
connect_trusted_nodes_only,
}
}

Expand Down Expand Up @@ -296,7 +300,7 @@ impl PeersManager {
Entry::Occupied(mut entry) => {
self.connection_info.decr_state(entry.get().state);

if entry.get().remove_after_disconnect {
if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
// this peer should be removed from the set
entry.remove();
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
Expand Down Expand Up @@ -424,24 +428,28 @@ impl PeersManager {

/// Removes the tracked node from the set.
pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) {
if let Some(mut peer) = self.peers.remove(&peer_id) {
trace!(target : "net::peers", ?peer_id, "remove discovered node");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));

if peer.state.is_connected() {
debug!(target : "net::peers", ?peer_id, "disconnecting on remove from discovery");
// we terminate the active session here, but only remove the peer after the session
// was disconnected, this prevents the case where the session is scheduled for
// disconnect but the node is immediately rediscovered, See also
// [`Self::on_disconnected()`]
peer.remove_after_disconnect = true;
peer.state.disconnect();
self.peers.insert(peer_id, peer);
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::DisconnectRequested),
})
}
let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
if entry.get().is_trusted() {
return
}
let mut peer = entry.remove();

trace!(target : "net::peers", ?peer_id, "remove discovered node");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));

if peer.state.is_connected() {
debug!(target : "net::peers", ?peer_id, "disconnecting on remove from discovery");
// we terminate the active session here, but only remove the peer after the session
// was disconnected, this prevents the case where the session is scheduled for
// disconnect but the node is immediately rediscovered, See also
// [`Self::on_disconnected()`]
peer.remove_after_disconnect = true;
peer.state.disconnect();
self.peers.insert(peer_id, peer);
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::DisconnectRequested),
})
}
}

Expand All @@ -451,22 +459,31 @@ impl PeersManager {
///
/// Returns `None` if no peer is available.
fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| peer.state.is_unconnected());
let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
peer.state.is_unconnected() &&
!peer.is_banned() &&
(!self.connect_trusted_nodes_only || peer.is_trusted())
});

// keep track of the best peer, if there's one
let mut best_peer = unconnected.next()?;

if best_peer.1.is_trusted() {
return Some((*best_peer.0, best_peer.1))
}

for maybe_better in unconnected {
if maybe_better.1.is_trusted() {
return Some((*maybe_better.0, maybe_better.1))
}
match (maybe_better.1.fork_id.as_ref(), best_peer.1.fork_id.as_ref()) {
(Some(_), Some(_)) | (None, None) => {
if maybe_better.1.reputation > best_peer.1.reputation {
best_peer = maybe_better;
}
}
(Some(_), None) => {
if !maybe_better.1.is_banned() {
best_peer = maybe_better;
}
best_peer = maybe_better;
}
_ => {}
}
Expand Down Expand Up @@ -630,6 +647,8 @@ pub struct Peer {
fork_id: Option<ForkId>,
/// Whether the entry should be removed after an existing session was terminated.
remove_after_disconnect: bool,
/// The kind of peer
kind: PeerKind,
}

// === impl Peer ===
Expand All @@ -639,13 +658,18 @@ impl Peer {
Self::with_state(addr, Default::default())
}

fn trusted(addr: SocketAddr) -> Self {
Self { kind: PeerKind::Trusted, ..Self::new(addr) }
}

fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self {
Self {
addr,
state,
reputation: DEFAULT_REPUTATION,
fork_id: None,
remove_after_disconnect: false,
kind: Default::default(),
}
}

Expand Down Expand Up @@ -684,6 +708,12 @@ impl Peer {
fn unban(&mut self) {
self.reputation = DEFAULT_REPUTATION
}

/// Returns whether this peer is trusted
#[inline]
fn is_trusted(&self) -> bool {
matches!(self.kind, PeerKind::Trusted)
}
}

/// Outcomes when a reputation change is applied to a peer
Expand Down Expand Up @@ -740,6 +770,16 @@ impl PeerConnectionState {
}
}

/// Represents the kind of peer
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
enum PeerKind {
/// Basic peer kind.
#[default]
Basic,
/// Trusted peer.
Trusted,
}

/// Commands the [`PeersManager`] listens for.
pub(crate) enum PeerCommand {
/// Command for manually add
Expand Down

0 comments on commit 1f6a943

Please sign in to comment.