Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/authority-discovery: Add option to disable querying #6354

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,21 @@ macro_rules! new_full_start {
macro_rules! new_full {
($config:expr, $with_startup_data: expr) => {{
use futures::prelude::*;
use sc_network::Event;
use sc_network::{config::NonReservedPeerMode, Event};
use sc_client_api::ExecutorProvider;
use sp_core::traits::BareCryptoStorePtr;

let (
role,
force_authoring,
name,
non_reserved_peer_mode,
disable_grandpa,
) = (
$config.role.clone(),
$config.force_authoring,
$config.network.node_name.clone(),
$config.network.non_reserved_mode,
$config.disable_grandpa,
);

Expand Down Expand Up @@ -236,6 +238,10 @@ macro_rules! new_full {
_ => unreachable!("Due to outer matches! constraint; qed.")
};

// When a node is only allowed to connect to the configured reserved nodes
// (`NonReservedPeerMode::Deny`) there is no benefit in discovering other authorities.
let discover_authorities = non_reserved_peer_mode == NonReservedPeerMode::Accept;

let network = service.network();
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
Expand All @@ -247,6 +253,7 @@ macro_rules! new_full {
sentries,
dht_event_stream,
authority_discovery_role,
discover_authorities,
service.prometheus_registry(),
);

Expand Down
2 changes: 2 additions & 0 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum Error {
ReceivingDhtValueFoundEventWithDifferentKeys,
/// Received dht value found event with no records.
ReceivingDhtValueFoundEventWithNoRecords,
/// Received dht value found event even though querying for authorities is disabled.
ReceivingDhtValueFoundEventWithQueryingDisabled,
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the authority discovery is the only code that will ever make DHT queries.
This cannot happen at the moment, but if another module calls get_value on the network service to make its own queries (unrelated to the authority discovery), these errors would be triggered.

/// Failed to verify a dht payload with the given signature.
VerifyingDhtPayload,
/// Failed to hash the authority id to be used as a dht key.
Expand Down
85 changes: 56 additions & 29 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! authority:
//!
//!
//! 1. **Makes itself discoverable**
//! 1. **Makes itself discoverable** (publishing)
//!
//! 1. Retrieves its external addresses (including peer id) or the ones of its sentry nodes.
//!
Expand All @@ -32,7 +32,7 @@
//! 3. Puts the signature and the addresses on the libp2p Kademlia DHT.
//!
//!
//! 2. **Discovers other authorities**
//! 2. **Discovers other authorities** (querying)
//!
//! 1. Retrieves the current set of authorities.
//!
Expand Down Expand Up @@ -104,6 +104,15 @@ pub enum Role {
Sentry,
}

enum Querying {
Enabled {
/// Interval on which to query for addresses of other authorities.
interval: Interval,
addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>,
},
Disabled,
}

/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
pub struct AuthorityDiscovery<Client, Network, Block>
where
Expand All @@ -128,10 +137,8 @@ where

/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
/// Interval on which to query for addresses of other authorities.
query_interval: Interval,

addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>,
querying: Querying,

metrics: Option<Metrics>,

Expand Down Expand Up @@ -159,6 +166,7 @@ where
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
role: Role,
discover_authorities: bool,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
Expand All @@ -170,22 +178,27 @@ where
Duration::from_secs(12 * 60 * 60),
);

// External addresses of other authorities can change at any given point in time. The
// interval on which to query for external addresses of other authorities is a trade off
// between efficiency and performance.
let query_interval = interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(10 * 60),
);
let querying = if discover_authorities {
Querying::Enabled {
// External addresses of other authorities can change at any given point in time.
// The interval on which to query for external addresses of other authorities is a
// trade off between efficiency and performance.
interval: interval_at(
Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
Duration::from_secs(10 * 60),
),
addr_cache: AddrCache::new(),
}
} else {
Querying::Disabled
};

let sentry_nodes = if !sentry_nodes.is_empty() {
Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::<Vec<_>>())
} else {
None
};

let addr_cache = AddrCache::new();

let metrics = match prometheus_registry {
Some(registry) => {
match Metrics::register(&registry) {
Expand All @@ -205,8 +218,7 @@ where
sentry_nodes,
dht_event_rx,
publish_interval,
query_interval,
addr_cache,
querying,
role,
metrics,
phantom: PhantomData,
Expand Down Expand Up @@ -393,6 +405,13 @@ where
&mut self,
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
) -> Result<()> {
let addr_cache = match self.querying {
Querying::Enabled { ref mut addr_cache, .. } => addr_cache,
Querying::Disabled => {
return Err(Error::ReceivingDhtValueFoundEventWithQueryingDisabled);
}
};

// Ensure `values` is not empty and all its keys equal.
let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
match acc {
Expand All @@ -411,7 +430,7 @@ where
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.addr_cache.retain_ids(&authorities);
addr_cache.retain_ids(&authorities);
authorities
.into_iter()
.map(|id| (hash_authority_id(id.as_ref()), id))
Expand Down Expand Up @@ -450,7 +469,7 @@ where
.into_iter().flatten().collect();

if !remote_addresses.is_empty() {
self.addr_cache.insert(authority_id.clone(), remote_addresses);
addr_cache.insert(authority_id.clone(), remote_addresses);
self.update_peer_set_priority_group()?;
}

Expand Down Expand Up @@ -490,7 +509,14 @@ where

/// Update the peer set 'authority' priority group.
fn update_peer_set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_subset();
let addr_cache = match self.querying {
Querying::Enabled { ref addr_cache, .. } => addr_cache,
Querying::Disabled => {
return Err(Error::ReceivingDhtValueFoundEventWithQueryingDisabled);
}
};

let addresses = addr_cache.get_subset();

if let Some(metrics) = &self.metrics {
metrics.priority_group_size.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
Expand Down Expand Up @@ -530,7 +556,6 @@ where
return Poll::Ready(());
}


// Publish own addresses.
if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
Expand All @@ -544,16 +569,18 @@ where
}
}

// Request addresses of authorities.
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
if let Querying::Enabled { interval, .. } = &mut self.querying {
// Request addresses of authorities.
if let Poll::Ready(_) = interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = interval.poll_next_unpin(cx) {}

if let Err(e) = self.request_addresses_of_others() {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
);
if let Err(e) = self.request_addresses_of_others() {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
);
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ fn new_registers_metrics() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
Some(registry.clone()),
);

Expand Down Expand Up @@ -259,6 +260,7 @@ fn request_addresses_of_others_triggers_dht_get_query() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
None,
);

Expand Down Expand Up @@ -301,6 +303,7 @@ fn publish_discover_cycle() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
None,
);

Expand Down Expand Up @@ -330,6 +333,7 @@ fn publish_discover_cycle() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
None,
);

Expand Down Expand Up @@ -373,6 +377,7 @@ fn terminate_when_event_stream_terminates() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
None,
);

Expand Down Expand Up @@ -414,6 +419,7 @@ fn dont_stop_polling_when_error_is_returned() {
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
true,
None,
);

Expand Down
2 changes: 1 addition & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ pub enum TransportConfig {
}

/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
Expand Down