From c5bc272057b473337191de50e7f86d07cf93c5a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Tue, 31 Jan 2023 20:05:13 +0100 Subject: [PATCH] fix: report reputation changes correctly (#1086) Co-authored-by: Matthias Seitz --- crates/interfaces/src/p2p/error.rs | 28 ++++++++++++++-- crates/net/network-api/src/reputation.rs | 2 +- crates/net/network/src/fetch/mod.rs | 42 ++++++++++++++++++++---- crates/net/network/src/message.rs | 2 +- 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 7275593a853a..573e60efe637 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -1,5 +1,6 @@ use super::headers::client::HeadersRequest; use crate::{consensus, db}; +use reth_network_api::ReputationChangeKind; use reth_primitives::{BlockHashOrNumber, BlockNumber, Header, WithPeerId, H256}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; @@ -14,6 +15,9 @@ pub type PeerRequestResult = RequestResult>; pub trait EthResponseValidator { /// Determine whether the response matches what we requested in [HeadersRequest] fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool; + + /// Return the response reputation impact if any + fn reputation_change_err(&self) -> Option; } impl EthResponseValidator for RequestResult> { @@ -37,6 +41,28 @@ impl EthResponseValidator for RequestResult> { Err(_) => true, } } + + /// [RequestError::ChannelClosed] is not possible here since these errors are mapped to + /// `ConnectionDropped`, which will be handled when the dropped connection is cleaned up. + /// + /// [RequestError::ConnectionDropped] should be ignored here because this is already handled + /// when the dropped connection is handled. + /// + /// [RequestError::UnsupportedCapability] is not used yet because we only support active session + /// for eth protocol. + fn reputation_change_err(&self) -> Option { + if let Err(err) = self { + match err { + RequestError::ChannelClosed => None, + RequestError::ConnectionDropped => None, + RequestError::UnsupportedCapability => None, + RequestError::Timeout => Some(ReputationChangeKind::Timeout), + RequestError::BadResponse => None, + } + } else { + None + } + } } /// Error variants that can happen when sending requests to a session. @@ -45,8 +71,6 @@ impl EthResponseValidator for RequestResult> { pub enum RequestError { #[error("Closed channel to the peer.")] ChannelClosed, - #[error("Not connected to the peer.")] - NotConnected, #[error("Connection to a peer dropped while handling the request.")] ConnectionDropped, #[error("Capability Message is not supported by remote peer.")] diff --git a/crates/net/network-api/src/reputation.rs b/crates/net/network-api/src/reputation.rs index a36135561456..30fce6716981 100644 --- a/crates/net/network-api/src/reputation.rs +++ b/crates/net/network-api/src/reputation.rs @@ -2,7 +2,7 @@ pub type Reputation = i32; /// Various kinds of reputation changes. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ReputationChangeKind { /// Received an unspecific bad message from the peer BadMessage, diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index bc14730bf90c..017208dc3b79 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -226,6 +226,7 @@ impl StateFetcher { res: RequestResult>, ) -> Option { let is_error = res.is_err(); + let reputation_change = res.reputation_change_err(); let resp = self.inflight_headers_requests.remove(&peer_id); let is_likely_bad_response = resp @@ -239,10 +240,9 @@ impl StateFetcher { if is_error { // if the response was erroneous we want to report the peer. - return Some(BlockResponseOutcome::BadResponse( - peer_id, - ReputationChangeKind::BadMessage, - )) + return reputation_change.map(|reputation_change| { + BlockResponseOutcome::BadResponse(peer_id, reputation_change) + }) } if let Some(peer) = self.peers.get_mut(&peer_id) { @@ -401,11 +401,11 @@ pub(crate) enum FetchAction { /// Outcome of a processed response. /// /// Returned after processing a response. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub(crate) enum BlockResponseOutcome { /// Continue with another request to the peer. Request(PeerId, BlockRequest), - /// How to handle a bad response and the reputation change to apply. + /// How to handle a bad response and the reputation change to apply, if any. BadResponse(PeerId, ReputationChangeKind), } @@ -483,4 +483,34 @@ mod tests { assert_eq!(fetcher.next_peer(), Some(peer2)); assert_eq!(fetcher.next_peer(), Some(peer2)); } + + #[tokio::test] + async fn test_on_block_headers_response() { + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let peer_id = H512::random(); + + assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None); + + assert_eq!( + fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)), + Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout)) + ); + assert_eq!( + fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)), + None + ); + assert_eq!( + fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)), + None + ); + assert_eq!( + fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)), + None + ); + assert_eq!( + fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)), + None + ); + } } diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 95934f75a6f4..f2027c0740ef 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -59,7 +59,7 @@ pub enum PeerMessage { } /// Request Variants that only target block related data. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] #[allow(missing_docs)] #[allow(clippy::enum_variant_names)] pub enum BlockRequest {