Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add a back-pressure-friendly alternative to NetworkService::write_notifications 🎉 #6692

Merged
28 commits merged into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4c8181c
Add NetworkService::send_notifications
tomaka Jul 7, 2020
432ee3c
Doc
tomaka Jul 7, 2020
73bf377
Doc
tomaka Jul 7, 2020
8921896
API adjustment
tomaka Jul 7, 2020
c90159c
Address concerns
tomaka Jul 7, 2020
24e3d31
Make it compile
tomaka Jul 13, 2020
d5c53b4
Merge remote-tracking branch 'upstream/master' into send-notifications
tomaka Jul 15, 2020
dc82672
Start implementation
tomaka Jul 15, 2020
7cf8526
Progress in the implementation
tomaka Jul 17, 2020
5a6236a
Change implementation strategy again
tomaka Jul 17, 2020
c90fe31
More work before weekend
tomaka Jul 17, 2020
0b1cffb
Finish changes
tomaka Jul 20, 2020
36ba174
Minor doc fix
tomaka Jul 20, 2020
a8a588c
Revert some minor changes
tomaka Jul 20, 2020
59b58b8
Merge remote-tracking branch 'upstream/master' into tka-send-notifica…
tomaka Jul 20, 2020
8ed7948
Apply suggestions from code review
tomaka Jul 27, 2020
2c1565d
Merge remote-tracking branch 'upstream/master' into tka-send-notifica…
tomaka Jul 27, 2020
67a200b
GroupError -> NotifsHandlerError
tomaka Jul 27, 2020
53f0ea8
Apply suggestions from code review
tomaka Jul 27, 2020
0d220fc
state_transition_waker -> close_waker
tomaka Jul 27, 2020
9c48c00
Apply suggestions from code review
tomaka Jul 27, 2020
525ff34
Finish renames in service.rs
tomaka Jul 27, 2020
ba4cf88
More renames
tomaka Jul 27, 2020
1d9b086
More review suggestsions applied
tomaka Jul 27, 2020
5f8f7f1
More review addressing
tomaka Jul 27, 2020
c4505f2
Final change
tomaka Jul 27, 2020
86cba45
512 -> 2048
tomaka Jul 29, 2020
fffb490
Merge remote-tracking branch 'upstream/master' into tka-send-notifica…
tomaka Jul 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests,
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol},
Event, ObservedRole, DhtEvent, ExHashT,
protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT,
};

use bytes::Bytes;
use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
Expand Down Expand Up @@ -98,11 +99,53 @@ pub enum BehaviourOut<B: BlockT> {
request_duration: Duration,
},

/// Any event represented by the [`Event`] enum.
/// Opened a substream with the given node with the given notifications protocol.
///
/// > **Note**: The [`Event`] enum contains the events that are available through the public
/// > API of the library.
Event(Event),
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
role: ObservedRole,
},

/// The [`NotificationsSink`] object used to send notifications with the given peer must be
/// replaced with a new one.
///
/// This event is typically emitted when a transport-level connection is closed and we fall
/// back to a secondary connection.
NotificationStreamReplaced {
/// Id of the peer we are connected to.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ConsensusEngineId, Bytes)>,
},

/// Event generated by a DHT.
Dht(DhtEvent),
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
Expand Down Expand Up @@ -165,8 +208,6 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {

/// Registers a new notifications protocol.
///
/// After that, you can call `write_notifications`.
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
/// about the protocol that you have registered.
///
Expand All @@ -182,14 +223,14 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
let handshake_message = Roles::from(&self.role).encode();

let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message);
for (remote, roles) in list {
for (remote, roles, notifications_sink) in list {
let role = reported_roles_to_observed_role(&self.role, remote, roles);
let ev = Event::NotificationStreamOpened {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
role,
};
self.events.push_back(BehaviourOut::Event(ev));
notifications_sink: notifications_sink.clone(),
});
}
}

Expand Down Expand Up @@ -278,26 +319,34 @@ Behaviour<B, H> {
CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => {
self.finality_proof_requests.send_request(&target, block_hash, request);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => {
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
for engine_id in protocols {
self.events.push_back(BehaviourOut::Event(Event::NotificationStreamOpened {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
role: role.clone(),
}));
notifications_sink: notifications_sink.clone(),
});
}
},
CustomMessageOutcome::NotificationStreamReplaced { remote, protocols, notifications_sink } =>
for engine_id in protocols {
self.events.push_back(BehaviourOut::NotificationStreamReplaced {
remote: remote.clone(),
engine_id,
notifications_sink: notifications_sink.clone(),
});
},
CustomMessageOutcome::NotificationStreamClosed { remote, protocols } =>
for engine_id in protocols {
self.events.push_back(BehaviourOut::Event(Event::NotificationStreamClosed {
self.events.push_back(BehaviourOut::NotificationStreamClosed {
remote: remote.clone(),
engine_id,
}));
});
},
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
let ev = Event::NotificationsReceived { remote, messages };
self.events.push_back(BehaviourOut::Event(ev));
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(peer_id, number) => {
self.light_client_handler.update_best_block(&peer_id, number);
Expand Down Expand Up @@ -393,16 +442,16 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
}
DiscoveryOut::ValueNotFound(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
}
DiscoveryOut::ValuePut(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
}
DiscoveryOut::ValuePutFailed(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
}
DiscoveryOut::RandomKademliaStarted(protocols) => {
for protocol in protocols {
Expand Down
80 changes: 42 additions & 38 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use sp_runtime::traits::{
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use message::generic::{Message as GenericMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry};
Expand All @@ -67,7 +67,7 @@ pub mod message;
pub mod event;
pub mod sync;

pub use generic_proto::LegacyConnectionKillError;
pub use generic_proto::{NotificationsSink, Ready, NotifsHandlerError, LegacyConnectionKillError};

const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
Expand Down Expand Up @@ -388,7 +388,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
metrics_registry: Option<&Registry>,
boot_node_ids: Arc<HashSet<PeerId>>,
queue_size_report: Option<HistogramVec>,
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(
Expand Down Expand Up @@ -417,7 +416,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report,
);

let mut legacy_equiv_by_name = HashMap::new();
Expand Down Expand Up @@ -948,7 +946,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}

/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
pub fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
Expand Down Expand Up @@ -1060,32 +1063,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
remote: who,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
roles: info.roles,
}
}

/// Send a notification to the given peer we're connected to.
///
/// Doesn't do anything if we don't have a notifications substream for that protocol with that
/// peer.
pub fn write_notification(
&mut self,
target: PeerId,
engine_id: ConsensusEngineId,
message: impl Into<Vec<u8>>,
) {
if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
let message = message.into();
let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
engine_id,
data: message.clone(),
}).encode();
self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback);
} else {
error!(
target: "sub-libp2p",
"Sending a notification with a protocol that wasn't registered: {:?}",
engine_id
);
notifications_sink,
}
}

Expand All @@ -1099,7 +1077,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>,
handshake_message: Vec<u8>,
) -> impl ExactSizeIterator<Item = (&'a PeerId, Roles)> + 'a {
) -> impl Iterator<Item = (&'a PeerId, Roles, &'a NotificationsSink)> + 'a {
let protocol_name = protocol_name.into();
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
Expand All @@ -1108,8 +1086,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
}

self.context_data.peers.iter()
.map(|(peer_id, peer)| (peer_id, peer.info.roles))
let behaviour = &self.behaviour;
self.context_data.peers.iter().filter_map(move |(peer_id, peer)| {
if let Some(notifications_sink) = behaviour.notifications_sink(peer_id) {
Some((peer_id, peer.info.roles, notifications_sink))
} else {
log::error!("State mismatch: no notifications sink for opened peer {:?}", peer_id);
None
}
})
}

/// Called when peer sends us new transactions
Expand Down Expand Up @@ -1863,7 +1848,18 @@ pub enum CustomMessageOutcome<B: BlockT> {
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Notification protocols have been opened with a remote.
NotificationStreamOpened { remote: PeerId, protocols: Vec<ConsensusEngineId>, roles: Roles },
NotificationStreamOpened {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
roles: Roles,
notifications_sink: NotificationsSink
},
/// The [`NotificationsSink`] of some notification protocols need an update.
NotificationStreamReplaced {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
notifications_sink: NotificationsSink,
},
/// Notification protocols have been closed with a remote.
NotificationStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
/// Messages have been received on one or more notifications protocols.
Expand Down Expand Up @@ -2028,9 +2024,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake),
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
Ok(msg) => {
debug!(
target: "sync",
Expand All @@ -2054,6 +2051,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
}
}
GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, .. } => {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
notifications_sink,
}
},
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id)
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/generic_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! network, then performs the Substrate protocol handling on top.

pub use self::behaviour::{GenericProto, GenericProtoOut};
pub use self::handler::LegacyConnectionKillError;
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready, LegacyConnectionKillError};

mod behaviour;
mod handler;
Expand Down
Loading