Skip to content

Commit

Permalink
fix: report reputation changes correctly (paradigmxyz#1086)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
leruaa and mattsse committed Jan 31, 2023
1 parent 5c32ad0 commit c5bc272
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 10 deletions.
28 changes: 26 additions & 2 deletions crates/interfaces/src/p2p/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::headers::client::HeadersRequest;
use crate::{consensus, db};
use reth_network_api::ReputationChangeKind;
use reth_primitives::{BlockHashOrNumber, BlockNumber, Header, WithPeerId, H256};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -14,6 +15,9 @@ pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
pub trait EthResponseValidator {
/// Determine whether the response matches what we requested in [HeadersRequest]
fn is_likely_bad_headers_response(&self, request: &HeadersRequest) -> bool;

/// Return the response reputation impact if any
fn reputation_change_err(&self) -> Option<ReputationChangeKind>;
}

impl EthResponseValidator for RequestResult<Vec<Header>> {
Expand All @@ -37,6 +41,28 @@ impl EthResponseValidator for RequestResult<Vec<Header>> {
Err(_) => true,
}
}

/// [RequestError::ChannelClosed] is not possible here since these errors are mapped to
/// `ConnectionDropped`, which will be handled when the dropped connection is cleaned up.
///
/// [RequestError::ConnectionDropped] should be ignored here because this is already handled
/// when the dropped connection is handled.
///
/// [RequestError::UnsupportedCapability] is not used yet because we only support active session
/// for eth protocol.
fn reputation_change_err(&self) -> Option<ReputationChangeKind> {
if let Err(err) = self {
match err {
RequestError::ChannelClosed => None,
RequestError::ConnectionDropped => None,
RequestError::UnsupportedCapability => None,
RequestError::Timeout => Some(ReputationChangeKind::Timeout),
RequestError::BadResponse => None,
}
} else {
None
}
}
}

/// Error variants that can happen when sending requests to a session.
Expand All @@ -45,8 +71,6 @@ impl EthResponseValidator for RequestResult<Vec<Header>> {
pub enum RequestError {
#[error("Closed channel to the peer.")]
ChannelClosed,
#[error("Not connected to the peer.")]
NotConnected,
#[error("Connection to a peer dropped while handling the request.")]
ConnectionDropped,
#[error("Capability Message is not supported by remote peer.")]
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network-api/src/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
pub type Reputation = i32;

/// Various kinds of reputation changes.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ReputationChangeKind {
/// Received an unspecific bad message from the peer
BadMessage,
Expand Down
42 changes: 36 additions & 6 deletions crates/net/network/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl StateFetcher {
res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> {
let is_error = res.is_err();
let reputation_change = res.reputation_change_err();

let resp = self.inflight_headers_requests.remove(&peer_id);
let is_likely_bad_response = resp
Expand All @@ -239,10 +240,9 @@ impl StateFetcher {

if is_error {
// if the response was erroneous we want to report the peer.
return Some(BlockResponseOutcome::BadResponse(
peer_id,
ReputationChangeKind::BadMessage,
))
return reputation_change.map(|reputation_change| {
BlockResponseOutcome::BadResponse(peer_id, reputation_change)
})
}

if let Some(peer) = self.peers.get_mut(&peer_id) {
Expand Down Expand Up @@ -401,11 +401,11 @@ pub(crate) enum FetchAction {
/// Outcome of a processed response.
///
/// Returned after processing a response.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum BlockResponseOutcome {
/// Continue with another request to the peer.
Request(PeerId, BlockRequest),
/// How to handle a bad response and the reputation change to apply.
/// How to handle a bad response and the reputation change to apply, if any.
BadResponse(PeerId, ReputationChangeKind),
}

Expand Down Expand Up @@ -483,4 +483,34 @@ mod tests {
assert_eq!(fetcher.next_peer(), Some(peer2));
assert_eq!(fetcher.next_peer(), Some(peer2));
}

#[tokio::test]
async fn test_on_block_headers_response() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle(), Default::default());
let peer_id = H512::random();

assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);

assert_eq!(
fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
);
assert_eq!(
fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
None
);
assert_eq!(
fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
None
);
assert_eq!(
fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
None
);
assert_eq!(
fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
None
);
}
}
2 changes: 1 addition & 1 deletion crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub enum PeerMessage {
}

/// Request Variants that only target block related data.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(missing_docs)]
#[allow(clippy::enum_variant_names)]
pub enum BlockRequest {
Expand Down

0 comments on commit c5bc272

Please sign in to comment.