Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/authority-discovery: Throttle DHT requests #7018

Merged
3 commits merged into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ pub enum Error {
HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError),
/// Failed calling into the Substrate runtime.
CallingRuntime(sp_blockchain::Error),
/// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it
/// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This
/// error is the result of the above failing.
MatchingHashedAuthorityIdWithAuthorityId,
/// Received a dht record with a key that does not match any in-flight awaited keys.
ReceivingUnexpectedRecord,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// Failed to encode a protobuf payload.
Expand Down
145 changes: 93 additions & 52 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use libp2p::{core::multiaddr, multihash::Multihash};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{
config::MultiaddrWithPeerId,
Expand Down Expand Up @@ -70,6 +71,9 @@ const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
/// Maximum number of addresses cached per authority. Additional addresses are discarded.
const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;

/// Maximum number of in-flight DHT lookups at any given point in time.
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;

/// Role an authority discovery module can run as.
pub enum Role {
/// Actual authority as well as a reference to its key store.
Expand Down Expand Up @@ -137,12 +141,17 @@ where

/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
/// Interval on which to query for addresses of other authorities.
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: Interval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: Interval,

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
/// Set of in-flight lookups.
in_flight_lookups: HashMap<libp2p::kad::record::Key, AuthorityId>,
romanb marked this conversation as resolved.
Show resolved Hide resolved

addr_cache: addr_cache::AddrCache,

metrics: Option<Metrics>,
Expand Down Expand Up @@ -183,8 +192,8 @@ where
Duration::from_secs(12 * 60 * 60),
);

// External addresses of other authorities can change at any given point in time. The
// interval on which to query for external addresses of other authorities is a trade off
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME;
let query_interval_duration = Duration::from_secs(10 * 60);
Expand All @@ -193,9 +202,9 @@ where
// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above, just delayed by 2 minutes.
// group on the same interval as the [`query_interval`] above, just delayed by 5 minutes.
let priority_group_set_interval = interval_at(
query_interval_start + Duration::from_secs(2 * 60),
query_interval_start + Duration::from_secs(5 * 60),
query_interval_duration,
);

Expand Down Expand Up @@ -229,6 +238,8 @@ where
publish_interval,
query_interval,
priority_group_set_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
addr_cache,
role,
metrics,
Expand Down Expand Up @@ -270,7 +281,9 @@ where

if let Some(metrics) = &self.metrics {
metrics.publish.inc();
metrics.amount_last_published.set(addresses.len() as u64);
metrics.amount_addresses_last_published.set(
addresses.len().try_into().unwrap_or(std::u64::MAX),
);
}

let mut serialized_addresses = vec![];
Expand Down Expand Up @@ -314,15 +327,9 @@ where
Ok(())
}

fn request_addresses_of_others(&mut self) -> Result<()> {
fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);

let authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?;

let local_keys = match &self.role {
Role::Authority(key_store) => {
key_store.read()
Expand All @@ -333,21 +340,52 @@ where
Role::Sentry => HashSet::new(),
};

for authority_id in authorities.iter() {
// Make sure we don't look up our own keys.
if !local_keys.contains(authority_id.as_ref()) {
if let Some(metrics) = &self.metrics {
metrics.request.inc();
}
let mut authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?
.into_iter()
.filter(|id| !local_keys.contains(id.as_ref()))
.collect();

self.network
.get_value(&hash_authority_id(authority_id.as_ref()));
}
self.addr_cache.retain_ids(&authorities);

authorities.shuffle(&mut thread_rng());
self.pending_lookups = authorities;
// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
// query interval ticks are far enough apart for all lookups to succeed.
self.in_flight_lookups.clear();

if let Some(metrics) = &self.metrics {
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}

Ok(())
}

fn start_new_lookups(&mut self) {
while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
let authority_id = match self.pending_lookups.pop() {
Some(authority) => authority,
None => return,
};
let hash = hash_authority_id(authority_id.as_ref());
self.network
.get_value(&hash);
self.in_flight_lookups.insert(hash, authority_id);

if let Some(metrics) = &self.metrics {
metrics.requests.inc();
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}
}
}

/// Handle incoming Dht events.
///
/// Returns either:
Expand Down Expand Up @@ -385,10 +423,17 @@ where
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}

debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
Some(DhtEvent::ValuePut(hash)) => {
if let Some(metrics) = &self.metrics {
Expand Down Expand Up @@ -434,23 +479,9 @@ where
}
})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;

let authorities = {
let block_id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.addr_cache.retain_ids(&authorities);
authorities
.into_iter()
.map(|id| (hash_authority_id(id.as_ref()), id))
.collect::<HashMap<_, _>>()
};

// Check if the event origins from an authority in the current or next authority set.
let authority_id: &AuthorityId = authorities
.get(&remote_key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
let authority_id: AuthorityId = self.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;

let local_peer_id = self.network.local_peer_id();

Expand All @@ -463,7 +494,7 @@ where
let signature = AuthoritySignature::decode(&mut &signature[..])
.map_err(Error::EncodingDecodingScale)?;

if !AuthorityPair::verify(&signature, &addresses, authority_id) {
if !AuthorityPair::verify(&signature, &addresses, &authority_id) {
return Err(Error::VerifyingDhtPayload);
}

Expand Down Expand Up @@ -503,7 +534,7 @@ where
.collect();

if !remote_addresses.is_empty() {
self.addr_cache.insert(authority_id.clone(), remote_addresses);
self.addr_cache.insert(authority_id, remote_addresses);
if let Some(metrics) = &self.metrics {
metrics.known_authorities_count.set(
self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)
Expand Down Expand Up @@ -610,15 +641,15 @@ where
}
}

// Request addresses of authorities.
// Request addresses of authorities, refilling the pending lookups queue.
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
romanb marked this conversation as resolved.
Show resolved Hide resolved

if let Err(e) = self.request_addresses_of_others() {
if let Err(e) = self.refill_pending_lookups_queue() {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
"Failed to refill pending lookups queue: {:?}", e,
);
}
}
Expand Down Expand Up @@ -652,6 +683,8 @@ where
}
}

self.start_new_lookups();

Poll::Pending
}
}
Expand Down Expand Up @@ -712,8 +745,9 @@ fn interval_at(start: Instant, duration: Duration) -> Interval {
#[derive(Clone)]
pub(crate) struct Metrics {
publish: Counter<U64>,
amount_last_published: Gauge<U64>,
request: Counter<U64>,
amount_addresses_last_published: Gauge<U64>,
requests: Counter<U64>,
requests_pending: Gauge<U64>,
dht_event_received: CounterVec<U64>,
handle_value_found_event_failure: Counter<U64>,
known_authorities_count: Gauge<U64>,
Expand All @@ -730,22 +764,29 @@ impl Metrics {
)?,
registry,
)?,
amount_last_published: register(
amount_addresses_last_published: register(
Gauge::new(
"authority_discovery_amount_external_addresses_last_published",
"Number of external addresses published when authority discovery last \
published addresses."
)?,
registry,
)?,
request: register(
requests: register(
Counter::new(
"authority_discovery_authority_addresses_requested_total",
"Number of times authority discovery has requested external addresses of a \
single authority."
)?,
registry,
)?,
requests_pending: register(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to split this between two counters requests_started_total and requests_finished_total?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand your question correctly. Let me try.

With this pull request authority discovery publishes (among others) the following 3 metrics:

  • authority_discovery_authority_address_requests_pending

  • authority_discovery_authority_addresses_requested_total

  • authority_discovery_dht_event_received

Your suggested requests_started_total metric already exists with authority_discovery_authority_addresses_requested_total. Your suggested requests_finished_total already exists with sum(authority_discovery_dht_event_received{name=~"value_found|value_not_found") by (name).

We could split authority_discovery_authority_address_requests_pending into two metrics in order to not miss spikes in between Prometheus scrapes, but I doubt that is worth the complexity tracking the two causes.

Does that make sense @tomaka?

Copy link
Contributor

@tomaka tomaka Sep 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the value of ..._authority_address_requests_pending is always equal to ..._authority_addresses_requested_total - sum(..._dht_event_received{name=~"value_found|value_not_found") by (name), then what is the reason for having it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I get the confusion.

There is a difference between pending, in-flight and finished lookups.

Every 10 minutes the authority discovery module requests the current and next authority set from the runtime and overrides pending_lookups accordingly. It then takes MAX_IN_FLIGHT_LOOKUPS lookups from pending_lookups calls the NetworkService::get_record and tracks those in-flight lookups in in_flight_lookups. Whenever a lookup succeeds another lookup is popped off from pending_lookups and passed to the network and inserted into in_flight_lookups.

authority_discovery_authority_address_requests_pending tracks the amount of pending, not yet started lookups.

..._authority_addresses_requested_total - sum(..._dht_event_received{name=~"value_found|value_not_found") by (name) tracks the amount of in-flight, started, not yet finished lookups.

Does that make sense @tomaka?

Gauge::new(
"authority_discovery_authority_address_requests_pending",
"Number of pending authority address requests."
)?,
registry,
)?,
dht_event_received: register(
CounterVec::new(
Opts::new(
Expand Down
Loading