Skip to content

Commit

Permalink
Send a status message on block announce handshake (paritytech#5726)
Browse files Browse the repository at this point in the history
* Send a status message on block announce handshake

* Make sure to send the handshake to all handlers
  • Loading branch information
tomaka authored Apr 22, 2020
1 parent 3a36e51 commit 932f40d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
34 changes: 33 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,31 @@ impl Default for ProtocolConfig {
}
}

/// Handshake sent when we open a block announces substream.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
struct BlockAnnouncesHandshake<B: BlockT> {
/// Roles of the node.
roles: Roles,
/// Best block number.
best_number: NumberFor<B>,
/// Best block hash.
best_hash: B::Hash,
/// Genesis block hash.
genesis_hash: B::Hash,
}

impl<B: BlockT> BlockAnnouncesHandshake<B> {
fn build(protocol_config: &ProtocolConfig, chain: &Arc<dyn Client<B>>) -> Self {
let info = chain.info();
BlockAnnouncesHandshake {
genesis_hash: info.genesis_hash,
roles: protocol_config.roles.into(),
best_number: info.best_number,
best_hash: info.best_hash,
}
}
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
Expand Down Expand Up @@ -369,7 +394,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.extend(b"/block-announces/1");
proto
});
behaviour.register_notif_protocol(block_announces_protocol.clone(), Vec::new());
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let protocol = Protocol {
Expand Down Expand Up @@ -1325,6 +1353,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn on_block_imported(&mut self, header: &B::Header, is_best: bool) {
if is_best {
self.sync.update_chain_info(header);
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
);
}
}

Expand Down
44 changes: 43 additions & 1 deletion client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,21 @@ enum PeerState {
}

impl PeerState {
/// True if there exists an established connection to tbe peer
/// True if there exists any established connection to the peer.
fn is_connected(&self) -> bool {
match self {
PeerState::Disabled { .. } |
PeerState::DisabledPendingEnable { .. } |
PeerState::Enabled { .. } |
PeerState::PendingRequest { .. } |
PeerState::Requested |
PeerState::Incoming { .. } => true,
PeerState::Poisoned |
PeerState::Banned { .. } => false,
}
}

/// True if there exists an established connection to the peer
/// that is open for custom protocol traffic.
fn is_open(&self) -> bool {
self.get_open().is_some()
Expand Down Expand Up @@ -338,6 +352,34 @@ impl GenericProto {
self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
}

/// Modifies the handshake of the given notifications protocol.
///
/// Has no effect if the protocol is unknown.
pub fn set_notif_protocol_handshake(
&mut self,
protocol_name: &[u8],
handshake_message: impl Into<Vec<u8>>
) {
let handshake_message = handshake_message.into();
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
protocol.1 = handshake_message.clone();
} else {
return;
}

// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) {
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
protocol_name: Cow::Owned(protocol_name.to_owned()),
handshake_message: handshake_message.clone(),
},
});
}
}

/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
Expand Down
24 changes: 24 additions & 0 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ pub enum NotifsHandlerIn {
message: Vec<u8>,
},

/// Modifies the handshake message of a notifications protocol.
UpdateHandshake {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols.
protocol_name: Cow<'static, [u8]>,

/// The new handshake message to send if we open a substream or if the remote opens a
/// substream towards us.
handshake_message: Vec<u8>,
},

/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
Expand Down Expand Up @@ -363,6 +375,18 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => {
for (handler, current_handshake) in &mut self.in_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
for (handler, current_handshake) in &mut self.out_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
}
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
Expand Down

0 comments on commit 932f40d

Please sign in to comment.