Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Stop sending messages on legacy substream altogether #6975

Merged
17 commits merged into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use assert_matches::assert_matches;
use environment::HasVoted;
use sc_network_test::{
Block, BlockImportAdapter, Hash, PassThroughVerifier, Peer, PeersClient, PeersFullClient,
TestClient, TestNetFactory,
TestClient, TestNetFactory, FullPeerConfig,
};
use sc_network::config::{ProtocolConfig, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
Expand Down Expand Up @@ -94,6 +94,15 @@ impl TestNetFactory for GrandpaTestNet {
ProtocolConfig::default()
}

fn add_full_peer(&mut self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any other reviewers, this function is called here and shadows add_full_peer here.

self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
(communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into())
],
..Default::default()
})
}

fn make_verifier(
&self,
_client: PeersClient,
Expand Down
176 changes: 16 additions & 160 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ use sp_consensus::{
use codec::{Decode, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, Roles};
use prometheus_endpoint::{
Registry, Gauge, Counter, CounterVec, GaugeVec,
Registry, Gauge, Counter, GaugeVec,
PrometheusError, Opts, register, U64
};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;

Expand Down Expand Up @@ -86,11 +86,6 @@ pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;

// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Maximum total bytes allowed for block bodies in `BlockResponse`
const MAX_BODIES_BYTES: usize = 8 * 1024 * 1024;

/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful
/// and disconnect to free connection slot.
Expand Down Expand Up @@ -119,8 +114,6 @@ mod rep {
pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected transaction packet.
pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// We received an unexpected light node request.
pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet");
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer is on unsupported protocol version.
Expand All @@ -139,7 +132,6 @@ struct Metrics {
finality_proofs: GaugeVec<U64>,
justifications: GaugeVec<U64>,
propagated_transactions: Counter<U64>,
legacy_requests_received: CounterVec<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -185,13 +177,6 @@ impl Metrics {
"sync_propagated_transactions",
"Number of transactions propagated to at least one peer",
)?, r)?,
legacy_requests_received: register(CounterVec::new(
Opts::new(
"sync_legacy_requests_received",
"Number of block/finality/light-client requests received on the legacy substream",
),
&["kind"]
)?, r)?,
})
}
}
Expand Down Expand Up @@ -604,19 +589,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
match message {
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
let outcome = self.on_block_response(who.clone(), r);
self.update_peer_info(&who);
return outcome
},
GenericMessage::BlockAnnounce(announce) => {
let outcome = self.on_block_announce(who.clone(), announce);
self.update_peer_info(&who);
return outcome;
},
GenericMessage::Transactions(m) =>
self.on_transactions(who, m),
GenericMessage::BlockResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected BlockResponse"),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadResponse(_) =>
Expand All @@ -627,6 +608,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::FinalityProofResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected FinalityProofResponse"),
GenericMessage::BlockRequest(_) |
GenericMessage::FinalityProofRequest(_) |
GenericMessage::RemoteReadChildRequest(_) |
GenericMessage::RemoteCallRequest(_) |
Expand Down Expand Up @@ -678,21 +660,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
CustomMessageOutcome::None
}

fn send_message(
&mut self,
who: &PeerId,
message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy: Message<B>,
) {
send_message::<B>(
&mut self.behaviour,
&mut self.context_data.stats,
who,
message,
legacy,
);
}

fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
}
Expand All @@ -718,92 +685,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["block-request"]).inc();
}

trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
request.id,
peer,
request.from,
request.to,
request.max,
request.fields,
);

// sending block requests to the node that is unable to serve it is considered a bad behavior
if !self.config.roles.is_full() {
trace!(target: "sync", "Peer {} is trying to sync from the light node", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_REQUEST);
return;
}

let mut blocks = Vec::new();
let mut id = match request.from {
message::FromBlock::Hash(h) => BlockId::Hash(h),
message::FromBlock::Number(n) => BlockId::Number(n),
};
let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
let get_header = request.fields.contains(message::BlockAttributes::HEADER);
let get_body = request.fields.contains(message::BlockAttributes::BODY);
let get_justification = request
.fields
.contains(message::BlockAttributes::JUSTIFICATION);
let mut total_size = 0;
while let Some(header) = self.context_data.chain.header(id).unwrap_or(None) {
if blocks.len() >= max || (blocks.len() >= 1 && total_size > MAX_BODIES_BYTES) {
break;
}
let number = *header.number();
let hash = header.hash();
let parent_hash = *header.parent_hash();
let justification = if get_justification {
self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None)
} else {
None
};
let block_data = message::generic::BlockData {
hash,
header: if get_header { Some(header) } else { None },
body: if get_body {
self.context_data
.chain
.block_body(&BlockId::Hash(hash))
.unwrap_or(None)
} else {
None
},
receipt: None,
message_queue: None,
justification,
};
// Stop if we don't have requested block body
if get_body && block_data.body.is_none() {
trace!(target: "sync", "Missing data for block request.");
break;
}
total_size += block_data.body.as_ref().map_or(0, |b| b.len());
blocks.push(block_data);
match request.direction {
message::Direction::Ascending => id = BlockId::Number(number + One::one()),
message::Direction::Descending => {
if number.is_zero() {
break;
}
id = BlockId::Hash(parent_hash)
}
}
}
let response = message::generic::BlockResponse {
id: request.id,
blocks,
};
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
self.send_message(&peer, None, GenericMessage::BlockResponse(response))
}

/// Adjusts the reputation of a node.
pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) {
self.peerset_handle.report_peer(who, reputation)
Expand Down Expand Up @@ -1207,14 +1088,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
let encoded = to_send.encode();
send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
Some((self.transactions_protocol.clone(), encoded)),
GenericMessage::Transactions(to_send)
)
self.behaviour.write_notification(
who,
self.transactions_protocol.clone(),
to_send.encode()
);
}
}

Expand Down Expand Up @@ -1289,15 +1167,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
};

let encoded = message.encode();

send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
Some((self.block_announces_protocol.clone(), encoded)),
Message::<B>::BlockAnnounce(message),
)
self.behaviour.write_notification(
who,
self.block_announces_protocol.clone(),
message.encode()
);
}
}
}
Expand Down Expand Up @@ -1605,24 +1479,6 @@ fn update_peer_request<B: BlockT, H: ExHashT>(
}
}

fn send_message<B: BlockT>(
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId,
message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy_message: Message<B>,
) {
let encoded = legacy_message.encode();
let mut stats = stats.entry(legacy_message.id()).or_default();
stats.bytes_out += encoded.len() as u64;
stats.count_out += 1;
if let Some((proto, msg)) = message {
behaviour.write_notification(who, proto, msg, encoded);
} else {
behaviour.send_packet(who, encoded);
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;
Expand Down
24 changes: 0 additions & 24 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ impl GenericProto {
target: &PeerId,
protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>,
) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
Expand All @@ -574,33 +573,10 @@ impl GenericProto {
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_sync_notification(
protocol_name,
encoded_fallback_message,
message
);
}

/// Sends a message to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent packet to {:?} without an open channel.",
target);
return
}
Some(sink) => sink
};

trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_legacy(message);
}

/// Returns the state of the peerset manager, for debugging purposes.
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.peerset.debug_info()
Expand Down
Loading