diff --git a/Cargo.lock b/Cargo.lock
index 044fb12ba4eb..015056f42a39 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -555,9 +555,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
-version = "1.0.0"
+version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b"
+checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
dependencies = [
"funty",
"radium",
@@ -6167,7 +6167,7 @@ dependencies = [
"fatality",
"futures",
"futures-timer",
- "lru 0.7.8",
+ "lru 0.8.0",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
@@ -6197,7 +6197,7 @@ dependencies = [
"futures",
"futures-timer",
"log",
- "lru 0.7.8",
+ "lru 0.8.0",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
@@ -6290,6 +6290,7 @@ version = "0.9.29"
dependencies = [
"always-assert",
"assert_matches",
+ "bitvec",
"env_logger 0.9.0",
"fatality",
"futures",
@@ -6334,8 +6335,9 @@ dependencies = [
"fatality",
"futures",
"futures-timer",
+ "indexmap",
"lazy_static",
- "lru 0.7.8",
+ "lru 0.8.0",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
@@ -6455,7 +6457,7 @@ dependencies = [
"futures-timer",
"kvdb",
"kvdb-memorydb",
- "lru 0.7.8",
+ "lru 0.8.0",
"merlin",
"parity-scale-codec",
"parking_lot 0.12.1",
@@ -6624,7 +6626,7 @@ dependencies = [
"futures-timer",
"kvdb",
"kvdb-memorydb",
- "lru 0.7.8",
+ "lru 0.8.0",
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -6911,7 +6913,7 @@ dependencies = [
"kvdb-shared-tests",
"lazy_static",
"log",
- "lru 0.7.8",
+ "lru 0.8.0",
"parity-db",
"parity-scale-codec",
"parity-util-mem",
@@ -6945,7 +6947,7 @@ dependencies = [
"femme",
"futures",
"futures-timer",
- "lru 0.7.8",
+ "lru 0.8.0",
"orchestra",
"parity-util-mem",
"parking_lot 0.12.1",
@@ -7302,7 +7304,7 @@ dependencies = [
"kvdb",
"kvdb-rocksdb",
"log",
- "lru 0.7.8",
+ "lru 0.8.0",
"pallet-babe",
"pallet-im-online",
"pallet-staking",
@@ -11921,7 +11923,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 0.1.10",
"digest 0.10.3",
"rand 0.8.5",
"static_assertions",
diff --git a/Cargo.toml b/Cargo.toml
index ee886dafdc8a..c7edd0621319 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -125,9 +125,9 @@ maintenance = { status = "actively-developed" }
#
# This list is ordered alphabetically.
[profile.dev.package]
-blake2b_simd = { opt-level = 3 }
blake2 = { opt-level = 3 }
blake2-rfc = { opt-level = 3 }
+blake2b_simd = { opt-level = 3 }
chacha20poly1305 = { opt-level = 3 }
cranelift-codegen = { opt-level = 3 }
cranelift-wasm = { opt-level = 3 }
@@ -138,8 +138,8 @@ curve25519-dalek = { opt-level = 3 }
ed25519-dalek = { opt-level = 3 }
flate2 = { opt-level = 3 }
futures-channel = { opt-level = 3 }
-hashbrown = { opt-level = 3 }
hash-db = { opt-level = 3 }
+hashbrown = { opt-level = 3 }
hmac = { opt-level = 3 }
httparse = { opt-level = 3 }
integer-sqrt = { opt-level = 3 }
@@ -151,8 +151,8 @@ libz-sys = { opt-level = 3 }
mio = { opt-level = 3 }
nalgebra = { opt-level = 3 }
num-bigint = { opt-level = 3 }
-parking_lot_core = { opt-level = 3 }
parking_lot = { opt-level = 3 }
+parking_lot_core = { opt-level = 3 }
percent-encoding = { opt-level = 3 }
primitive-types = { opt-level = 3 }
reed-solomon-novelpoly = { opt-level = 3 }
@@ -162,6 +162,7 @@ sha2 = { opt-level = 3 }
sha3 = { opt-level = 3 }
smallvec = { opt-level = 3 }
snow = { opt-level = 3 }
+substrate-bip39 = {opt-level = 3}
twox-hash = { opt-level = 3 }
uint = { opt-level = 3 }
wasmi = { opt-level = 3 }
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 1e770cd8715b..1be1b63c0cfb 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -56,7 +56,11 @@ cli = [
"polkadot-client",
"polkadot-node-core-pvf",
]
-runtime-benchmarks = ["service/runtime-benchmarks", "polkadot-node-metrics/runtime-benchmarks"]
+runtime-benchmarks = [
+ "service/runtime-benchmarks",
+ "polkadot-node-metrics/runtime-benchmarks",
+ "polkadot-performance-test?/runtime-benchmarks"
+]
trie-memory-tracker = ["sp-trie/memory-tracker"]
full-node = ["service/full-node"]
try-runtime = ["service/try-runtime"]
diff --git a/node/core/approval-voting/Cargo.toml b/node/core/approval-voting/Cargo.toml
index f2572cac8232..e39a589b5675 100644
--- a/node/core/approval-voting/Cargo.toml
+++ b/node/core/approval-voting/Cargo.toml
@@ -10,7 +10,7 @@ futures-timer = "3.0.2"
parity-scale-codec = { version = "3.1.5", default-features = false, features = ["bit-vec", "derive"] }
gum = { package = "tracing-gum", path = "../../gum" }
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
-lru = "0.7"
+lru = "0.8"
merlin = "2.0"
schnorrkel = "0.9.1"
kvdb = "0.11.0"
diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs
index d43bf40546ae..5413c271e0d6 100644
--- a/node/core/approval-voting/src/import.rs
+++ b/node/core/approval-voting/src/import.rs
@@ -1296,6 +1296,38 @@ pub(crate) mod tests {
}
);
+ // Caching of sesssions needs sessoion of first unfinalied block.
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
+ s_tx,
+ )) => {
+ let _ = s_tx.send(Ok(header.number));
+ }
+ );
+
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
+ block_number,
+ s_tx,
+ )) => {
+ assert_eq!(block_number, header.number);
+ let _ = s_tx.send(Ok(Some(header.hash())));
+ }
+ );
+
+ assert_matches!(
+ handle.recv().await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ h,
+ RuntimeApiRequest::SessionIndexForChild(s_tx),
+ )) => {
+ assert_eq!(h, header.hash());
+ let _ = s_tx.send(Ok(session));
+ }
+ );
+
// determine_new_blocks exits early as the parent_hash is in the DB
assert_matches!(
diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs
index ac025f366ab7..467d8be612e9 100644
--- a/node/core/approval-voting/src/lib.rs
+++ b/node/core/approval-voting/src/lib.rs
@@ -70,6 +70,7 @@ use std::{
collections::{
btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet,
},
+ num::NonZeroUsize,
sync::Arc,
time::Duration,
};
@@ -104,7 +105,11 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
/// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
/// lock issues for example.
const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
-const APPROVAL_CACHE_SIZE: usize = 1024;
+const APPROVAL_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(1024) {
+ Some(cap) => cap,
+ None => panic!("Approval cache size must be non-zero."),
+};
+
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
const APPROVAL_DELAY: Tick = 2;
const LOG_TARGET: &str = "parachain::approval-voting";
diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs
index 66d1402ed6dc..bdb7a8c929b3 100644
--- a/node/core/approval-voting/src/tests.rs
+++ b/node/core/approval-voting/src/tests.rs
@@ -807,6 +807,37 @@ async fn import_block(
}
);
+ assert_matches!(
+ overseer_recv(overseer).await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
+ s_tx,
+ )) => {
+ let _ = s_tx.send(Ok(number));
+ }
+ );
+
+ assert_matches!(
+ overseer_recv(overseer).await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
+ block_number,
+ s_tx,
+ )) => {
+ assert_eq!(block_number, number);
+ let _ = s_tx.send(Ok(Some(hashes[number as usize].0)));
+ }
+ );
+
+ assert_matches!(
+ overseer_recv(overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ h,
+ RuntimeApiRequest::SessionIndexForChild(s_tx),
+ )) => {
+ assert_eq!(h, hashes[number as usize].0);
+ let _ = s_tx.send(Ok(number.into()));
+ }
+ );
+
if !fork {
assert_matches!(
overseer_recv(overseer).await,
diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml
index bc22b40c8529..7d7bc25e91d4 100644
--- a/node/core/dispute-coordinator/Cargo.toml
+++ b/node/core/dispute-coordinator/Cargo.toml
@@ -10,7 +10,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
parity-scale-codec = "3.1.5"
kvdb = "0.11.0"
thiserror = "1.0.31"
-lru = "0.7.7"
+lru = "0.8.0"
fatality = "0.0.6"
polkadot-primitives = { path = "../../../primitives" }
diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs
index b45dbfa95197..7d5d33e1ff4b 100644
--- a/node/core/dispute-coordinator/src/scraping/mod.rs
+++ b/node/core/dispute-coordinator/src/scraping/mod.rs
@@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use std::collections::{BTreeMap, HashSet};
+use std::{
+ collections::{BTreeMap, HashSet},
+ num::NonZeroUsize,
+};
use futures::channel::oneshot;
use lru::LruCache;
@@ -44,7 +47,10 @@ mod tests;
/// `last_observed_blocks` LRU. This means, this value should the very least be as large as the
/// number of expected forks for keeping chain scraping efficient. Making the LRU much larger than
/// that has very limited use.
-const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20;
+const LRU_OBSERVED_BLOCKS_CAPACITY: NonZeroUsize = match NonZeroUsize::new(20) {
+ Some(cap) => cap,
+ None => panic!("Observed blocks cache size must be non-zero"),
+};
/// Chain scraper
///
diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs
index ff85319599ce..aaef00999259 100644
--- a/node/core/dispute-coordinator/src/tests.rs
+++ b/node/core/dispute-coordinator/src/tests.rs
@@ -239,13 +239,15 @@ impl TestState {
)))
.await;
- self.handle_sync_queries(virtual_overseer, block_hash, session).await;
+ self.handle_sync_queries(virtual_overseer, block_hash, block_number, session)
+ .await;
}
async fn handle_sync_queries(
&mut self,
virtual_overseer: &mut VirtualOverseer,
block_hash: Hash,
+ block_number: BlockNumber,
session: SessionIndex,
) {
// Order of messages is not fixed (different on initializing):
@@ -278,11 +280,45 @@ impl TestState {
finished_steps.got_session_information = true;
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
+
+ // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`.
+ assert_matches!(
+ overseer_recv(virtual_overseer).await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
+ s_tx,
+ )) => {
+ let _ = s_tx.send(Ok(block_number));
+ }
+ );
+
+ assert_matches!(
+ overseer_recv(virtual_overseer).await,
+ AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
+ number,
+ s_tx,
+ )) => {
+ assert_eq!(block_number, number);
+ let _ = s_tx.send(Ok(Some(block_hash)));
+ }
+ );
+
+ assert_matches!(
+ overseer_recv(virtual_overseer).await,
+ AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ h,
+ RuntimeApiRequest::SessionIndexForChild(s_tx),
+ )) => {
+ assert_eq!(h, block_hash);
+ let _ = s_tx.send(Ok(session));
+ }
+ );
+
// No queries, if subsystem knows about this session already.
if self.known_session == Some(session) {
continue
}
self.known_session = Some(session);
+
loop {
// answer session info queries until the current session is reached.
assert_matches!(
@@ -361,7 +397,8 @@ impl TestState {
)))
.await;
- self.handle_sync_queries(virtual_overseer, *leaf, session).await;
+ self.handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session)
+ .await;
}
}
diff --git a/node/network/availability-distribution/Cargo.toml b/node/network/availability-distribution/Cargo.toml
index 43d56a1ace24..3e8626c18898 100644
--- a/node/network/availability-distribution/Cargo.toml
+++ b/node/network/availability-distribution/Cargo.toml
@@ -19,7 +19,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
thiserror = "1.0.31"
rand = "0.8.5"
derive_more = "0.99.17"
-lru = "0.7.7"
+lru = "0.8.0"
fatality = "0.0.6"
[dev-dependencies]
diff --git a/node/network/availability-distribution/src/requester/session_cache.rs b/node/network/availability-distribution/src/requester/session_cache.rs
index 6d41d9301233..cf01e448b70b 100644
--- a/node/network/availability-distribution/src/requester/session_cache.rs
+++ b/node/network/availability-distribution/src/requester/session_cache.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use std::collections::HashSet;
+use std::{collections::HashSet, num::NonZeroUsize};
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
@@ -85,7 +85,7 @@ impl SessionCache {
pub fn new() -> Self {
SessionCache {
// We need to cache the current and the last session the most:
- session_info_cache: LruCache::new(2),
+ session_info_cache: LruCache::new(NonZeroUsize::new(2).unwrap()),
}
}
diff --git a/node/network/availability-recovery/Cargo.toml b/node/network/availability-recovery/Cargo.toml
index fce9755a05a3..86f6237740fa 100644
--- a/node/network/availability-recovery/Cargo.toml
+++ b/node/network/availability-recovery/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
futures = "0.3.21"
-lru = "0.7.7"
+lru = "0.8.0"
rand = "0.8.5"
fatality = "0.0.6"
thiserror = "1.0.31"
diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs
index 294bd8eb1b7b..a07f4e0baa52 100644
--- a/node/network/availability-recovery/src/lib.rs
+++ b/node/network/availability-recovery/src/lib.rs
@@ -20,6 +20,7 @@
use std::{
collections::{HashMap, VecDeque},
+ num::NonZeroUsize,
pin::Pin,
time::Duration,
};
@@ -77,7 +78,10 @@ const LOG_TARGET: &str = "parachain::availability-recovery";
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
-const LRU_SIZE: usize = 16;
+const LRU_SIZE: NonZeroUsize = match NonZeroUsize::new(16) {
+ Some(cap) => cap,
+ None => panic!("Availability-recovery cache size must be non-zero."),
+};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
diff --git a/node/network/collator-protocol/Cargo.toml b/node/network/collator-protocol/Cargo.toml
index df9e75c9e951..e089719106b5 100644
--- a/node/network/collator-protocol/Cargo.toml
+++ b/node/network/collator-protocol/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
always-assert = "0.1.2"
+bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures = "0.3.21"
futures-timer = "3"
gum = { package = "tracing-gum", path = "../../gum" }
diff --git a/node/network/collator-protocol/src/collator_side/metrics.rs b/node/network/collator-protocol/src/collator_side/metrics.rs
new file mode 100644
index 000000000000..85e00406b9ba
--- /dev/null
+++ b/node/network/collator-protocol/src/collator_side/metrics.rs
@@ -0,0 +1,123 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use polkadot_node_subsystem_util::metrics::{self, prometheus};
+
+#[derive(Clone, Default)]
+pub struct Metrics(Option);
+
+impl Metrics {
+ pub fn on_advertisment_made(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.advertisements_made.inc();
+ }
+ }
+
+ pub fn on_collation_sent_requested(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.collations_send_requested.inc();
+ }
+ }
+
+ pub fn on_collation_sent(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.collations_sent.inc();
+ }
+ }
+
+ /// Provide a timer for `process_msg` which observes on drop.
+ pub fn time_process_msg(&self) -> Option {
+ self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
+ }
+
+ /// Provide a timer for `distribute_collation` which observes on drop.
+ pub fn time_collation_distribution(
+ &self,
+ label: &'static str,
+ ) -> Option {
+ self.0.as_ref().map(|metrics| {
+ metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
+ })
+ }
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+ advertisements_made: prometheus::Counter,
+ collations_sent: prometheus::Counter,
+ collations_send_requested: prometheus::Counter,
+ process_msg: prometheus::Histogram,
+ collation_distribution_time: prometheus::HistogramVec,
+}
+
+impl metrics::Metrics for Metrics {
+ fn try_register(
+ registry: &prometheus::Registry,
+ ) -> std::result::Result {
+ let metrics = MetricsInner {
+ advertisements_made: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collation_advertisements_made_total",
+ "A number of collation advertisements sent to validators.",
+ )?,
+ registry,
+ )?,
+ collations_send_requested: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collations_sent_requested_total",
+ "A number of collations requested to be sent to validators.",
+ )?,
+ registry,
+ )?,
+ collations_sent: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collations_sent_total",
+ "A number of collations sent to validators.",
+ )?,
+ registry,
+ )?,
+ process_msg: prometheus::register(
+ prometheus::Histogram::with_opts(
+ prometheus::HistogramOpts::new(
+ "polkadot_parachain_collator_protocol_collator_process_msg",
+ "Time spent within `collator_protocol_collator::process_msg`",
+ )
+ .buckets(vec![
+ 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+ 1.0,
+ ]),
+ )?,
+ registry,
+ )?,
+ collation_distribution_time: prometheus::register(
+ prometheus::HistogramVec::new(
+ prometheus::HistogramOpts::new(
+ "polkadot_parachain_collator_protocol_collator_distribution_time",
+ "Time spent within `collator_protocol_collator::distribute_collation`",
+ )
+ .buckets(vec![
+ 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+ 1.0,
+ ]),
+ &["state"],
+ )?,
+ registry,
+ )?,
+ };
+
+ Ok(Metrics(Some(metrics)))
+ }
+}
diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs
index c1a20a2a670b..4f2eea2ca747 100644
--- a/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/node/network/collator-protocol/src/collator_side/mod.rs
@@ -17,7 +17,7 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
- time::Duration,
+ time::{Duration, Instant},
};
use futures::{
@@ -44,19 +44,25 @@ use polkadot_node_subsystem::{
overseer, FromOrchestra, OverseerSignal, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
- metrics::{self, prometheus},
runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo},
TimeoutExt,
};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
- Hash, Id as ParaId,
+ GroupIndex, Hash, Id as ParaId, SessionIndex,
};
use super::LOG_TARGET;
use crate::error::{log_error, Error, FatalError, Result};
use fatality::Split;
+mod metrics;
+mod validators_buffer;
+
+use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};
+
+pub use metrics::Metrics;
+
#[cfg(test)]
mod tests;
@@ -73,111 +79,16 @@ const COST_APPARENT_FLOOD: Rep =
/// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
-#[derive(Clone, Default)]
-pub struct Metrics(Option);
-
-impl Metrics {
- fn on_advertisment_made(&self) {
- if let Some(metrics) = &self.0 {
- metrics.advertisements_made.inc();
- }
- }
-
- fn on_collation_sent_requested(&self) {
- if let Some(metrics) = &self.0 {
- metrics.collations_send_requested.inc();
- }
- }
-
- fn on_collation_sent(&self) {
- if let Some(metrics) = &self.0 {
- metrics.collations_sent.inc();
- }
- }
-
- /// Provide a timer for `process_msg` which observes on drop.
- fn time_process_msg(&self) -> Option {
- self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
- }
-
- /// Provide a timer for `distribute_collation` which observes on drop.
- fn time_collation_distribution(
- &self,
- label: &'static str,
- ) -> Option {
- self.0.as_ref().map(|metrics| {
- metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
- })
- }
-}
-
-#[derive(Clone)]
-struct MetricsInner {
- advertisements_made: prometheus::Counter,
- collations_sent: prometheus::Counter,
- collations_send_requested: prometheus::Counter,
- process_msg: prometheus::Histogram,
- collation_distribution_time: prometheus::HistogramVec,
-}
+/// Ensure that collator issues a connection request at least once every this many seconds.
+/// Usually it's done when advertising new collation. However, if the core stays occupied or
+/// it's not our turn to produce a candidate, it's important to disconnect from previous
+/// peers.
+///
+/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
+const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12);
-impl metrics::Metrics for Metrics {
- fn try_register(
- registry: &prometheus::Registry,
- ) -> std::result::Result {
- let metrics = MetricsInner {
- advertisements_made: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collation_advertisements_made_total",
- "A number of collation advertisements sent to validators.",
- )?,
- registry,
- )?,
- collations_send_requested: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collations_sent_requested_total",
- "A number of collations requested to be sent to validators.",
- )?,
- registry,
- )?,
- collations_sent: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collations_sent_total",
- "A number of collations sent to validators.",
- )?,
- registry,
- )?,
- process_msg: prometheus::register(
- prometheus::Histogram::with_opts(
- prometheus::HistogramOpts::new(
- "polkadot_parachain_collator_protocol_collator_process_msg",
- "Time spent within `collator_protocol_collator::process_msg`",
- )
- .buckets(vec![
- 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
- 1.0,
- ]),
- )?,
- registry,
- )?,
- collation_distribution_time: prometheus::register(
- prometheus::HistogramVec::new(
- prometheus::HistogramOpts::new(
- "polkadot_parachain_collator_protocol_collator_distribution_time",
- "Time spent within `collator_protocol_collator::distribute_collation`",
- )
- .buckets(vec![
- 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
- 1.0,
- ]),
- &["state"],
- )?,
- registry,
- )?,
- };
-
- Ok(Metrics(Some(metrics)))
- }
-}
+/// How often to check for reconnect timeout.
+const RECONNECT_POLL: Duration = Duration::from_secs(1);
/// Info about validators we are currently connected to.
///
@@ -269,8 +180,14 @@ struct WaitingCollationFetches {
waiting_peers: HashSet,
}
+struct CollationSendResult {
+ relay_parent: Hash,
+ peer_id: PeerId,
+ timed_out: bool,
+}
+
type ActiveCollationFetches =
- FuturesUnordered + Send + 'static>>>;
+ FuturesUnordered + Send + 'static>>>;
struct State {
/// Our network peer id.
@@ -308,6 +225,13 @@ struct State {
/// by `PeerConnected` events.
peer_ids: HashMap>,
+ /// Tracks which validators we want to stay connected to.
+ validator_groups_buf: ValidatorGroupsBuffer,
+
+ /// Timestamp of the last connection request to a non-empty list of validators,
+ /// `None` otherwise.
+ last_connected_at: Option,
+
/// Metrics.
metrics: Metrics,
@@ -339,6 +263,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
+ validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
+ last_connected_at: None,
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
@@ -373,6 +299,7 @@ async fn distribute_collation(
result_sender: Option>,
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
+ let candidate_hash = receipt.hash();
// This collation is not in the active-leaves set.
if !state.view.contains(&relay_parent) {
@@ -412,10 +339,10 @@ async fn distribute_collation(
};
// Determine the group on that core.
- let current_validators =
+ let GroupValidators { validators, session_index, group_index } =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
- if current_validators.validators.is_empty() {
+ if validators.is_empty() {
gum::warn!(
target: LOG_TARGET,
core = ?our_core,
@@ -425,24 +352,36 @@ async fn distribute_collation(
return Ok(())
}
+ // It's important to insert new collation bits **before**
+ // issuing a connection request.
+ //
+ // If a validator managed to fetch all the relevant collations
+ // but still assigned to our core, we keep the connection alive.
+ state.validator_groups_buf.note_collation_advertised(
+ relay_parent,
+ session_index,
+ group_index,
+ &validators,
+ );
+
gum::debug!(
target: LOG_TARGET,
para_id = %id,
relay_parent = %relay_parent,
- candidate_hash = ?receipt.hash(),
+ ?candidate_hash,
pov_hash = ?pov.hash(),
core = ?our_core,
- ?current_validators,
+ current_validators = ?validators,
"Accepted collation, connecting to validators."
);
- // Issue a discovery request for the validators of the current group:
- connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
+ // Update a set of connected validators if necessary.
+ state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await;
state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
if let Some(result_sender) = result_sender {
- state.collation_result_senders.insert(receipt.hash(), result_sender);
+ state.collation_result_senders.insert(candidate_hash, result_sender);
}
state
@@ -483,6 +422,9 @@ async fn determine_core(
struct GroupValidators {
/// The validators of above group (their discovery keys).
validators: Vec,
+
+ session_index: SessionIndex,
+ group_index: GroupIndex,
}
/// Figure out current group of validators assigned to the para being collated on.
@@ -516,7 +458,11 @@ async fn determine_our_validators(
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
- let current_validators = GroupValidators { validators: current_validators };
+ let current_validators = GroupValidators {
+ validators: current_validators,
+ session_index,
+ group_index: current_group_index,
+ };
Ok(current_validators)
}
@@ -541,13 +487,19 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) {
}
}
-/// Issue a connection request to a set of validators and
-/// revoke the previous connection request.
+/// Updates a set of connected validators based on their advertisement-bits
+/// in a validators buffer.
+///
+/// Returns current timestamp if the connection request was non-empty, `None`
+/// otherwise.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators(
ctx: &mut Context,
- validator_ids: Vec,
-) {
+ validator_groups_buf: &ValidatorGroupsBuffer,
+) -> Option {
+ let validator_ids = validator_groups_buf.validators_to_connect();
+ let is_disconnect = validator_ids.is_empty();
+
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
@@ -557,6 +509,8 @@ async fn connect_to_validators(
failed,
})
.await;
+
+ (!is_disconnect).then_some(Instant::now())
}
/// Advertise collation to the given `peer`.
@@ -715,15 +669,9 @@ async fn send_collation(
state.active_collation_fetches.push(
async move {
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
- if r.is_none() {
- gum::debug!(
- target: LOG_TARGET,
- ?relay_parent,
- ?peer_id,
- "Sending collation to validator timed out, carrying on with next validator."
- );
- }
- (relay_parent, peer_id)
+ let timed_out = r.is_none();
+
+ CollationSendResult { relay_parent, peer_id, timed_out }
}
.boxed(),
);
@@ -986,6 +934,7 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
+ state.validator_groups_buf.remove_relay_parent(removed);
}
state.view = view;
@@ -1007,6 +956,9 @@ pub(crate) async fn run(
let mut state = State::new(local_peer_id, collator_pair, metrics);
let mut runtime = RuntimeInfo::new(None);
+ let reconnect_stream = super::tick_stream(RECONNECT_POLL);
+ pin_mut!(reconnect_stream);
+
loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
@@ -1022,7 +974,25 @@ pub(crate) async fn run(
FromOrchestra::Signal(BlockFinalized(..)) => {}
FromOrchestra::Signal(Conclude) => return Ok(()),
},
- (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
+ CollationSendResult {
+ relay_parent,
+ peer_id,
+ timed_out,
+ } = state.active_collation_fetches.select_next_some() => {
+ if timed_out {
+ gum::debug!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?peer_id,
+ "Sending collation to validator timed out, carrying on with next validator",
+ );
+ } else {
+ for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
+ // Timeout not hit, this peer is no longer interested in this relay parent.
+ state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id);
+ }
+ }
+
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
waiting.waiting_peers.remove(&peer_id);
if let Some(next) = waiting.waiting.pop_front() {
@@ -1042,7 +1012,29 @@ pub(crate) async fn run(
send_collation(&mut state, next, receipt, pov).await;
}
- }
+ },
+ _ = reconnect_stream.next() => {
+ let now = Instant::now();
+ if state
+ .last_connected_at
+ .map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT)
+ {
+ // Remove all advertisements from the buffer if the timeout was hit.
+ // Usually, it shouldn't be necessary as leaves get deactivated, rather
+ // serves as a safeguard against finality lags.
+ state.validator_groups_buf.clear_advertisements();
+ // Returns `None` if connection request is empty.
+ state.last_connected_at =
+ connect_to_validators(&mut ctx, &state.validator_groups_buf).await;
+
+ gum::debug!(
+ target: LOG_TARGET,
+ timeout = ?RECONNECT_TIMEOUT,
+ "Timeout hit, sent a connection request. Disconnected from all validators = {}",
+ state.last_connected_at.is_none(),
+ );
+ }
+ },
in_req = recv_req => {
match in_req {
Ok(req) => {
diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs
index 2d2f2cf043de..c20a2d6c97a5 100644
--- a/node/network/collator-protocol/src/collator_side/tests.rs
+++ b/node/network/collator-protocol/src/collator_side/tests.rs
@@ -56,7 +56,7 @@ struct TestState {
group_rotation_info: GroupRotationInfo,
validator_peer_id: Vec,
relay_parent: Hash,
- availability_core: CoreState,
+ availability_cores: Vec,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
@@ -88,14 +88,15 @@ impl Default for TestState {
let validator_peer_id =
std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect();
- let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
+ let validator_groups = vec![vec![2, 0, 4], vec![1, 3]]
.into_iter()
.map(|g| g.into_iter().map(ValidatorIndex).collect())
.collect();
let group_rotation_info =
GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };
- let availability_core = CoreState::Scheduled(ScheduledCore { para_id, collator: None });
+ let availability_cores =
+ vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free];
let relay_parent = Hash::random();
@@ -122,7 +123,7 @@ impl Default for TestState {
group_rotation_info,
validator_peer_id,
relay_parent,
- availability_core,
+ availability_cores,
local_peer_id,
collator_pair,
session_index: 1,
@@ -132,7 +133,9 @@ impl Default for TestState {
impl TestState {
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
- &self.session_info.validator_groups[0]
+ let core_num = self.availability_cores.len();
+ let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
+ &self.session_info.validator_groups[group_idx as usize]
}
fn current_session_index(&self) -> SessionIndex {
@@ -333,7 +336,7 @@ async fn distribute_collation(
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
- tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
+ tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
@@ -987,3 +990,104 @@ where
test_harness
});
}
+
+#[test]
+fn connect_to_buffered_groups() {
+ let mut test_state = TestState::default();
+ let local_peer_id = test_state.local_peer_id.clone();
+ let collator_pair = test_state.collator_pair.clone();
+
+ test_harness(local_peer_id, collator_pair, |test_harness| async move {
+ let mut virtual_overseer = test_harness.virtual_overseer;
+ let mut req_cfg = test_harness.req_cfg;
+
+ setup_system(&mut virtual_overseer, &test_state).await;
+
+ let group_a = test_state.current_group_validator_authority_ids();
+ let peers_a = test_state.current_group_validator_peer_ids();
+ assert!(group_a.len() > 1);
+
+ distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridgeTx(
+ NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+ ) => {
+ assert_eq!(group_a, validator_ids);
+ }
+ );
+
+ let head_a = test_state.relay_parent;
+
+ for (val, peer) in group_a.iter().zip(&peers_a) {
+ connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
+ }
+
+ for peer_id in &peers_a {
+ expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await;
+ }
+
+ // Update views.
+ for peed_id in &peers_a {
+ send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await;
+ expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a).await;
+ }
+
+ let peer = peers_a[0];
+ // Peer from the group fetches the collation.
+ let (pending_response, rx) = oneshot::channel();
+ req_cfg
+ .inbound_queue
+ .as_mut()
+ .unwrap()
+ .send(RawIncomingRequest {
+ peer,
+ payload: CollationFetchingRequest {
+ relay_parent: head_a,
+ para_id: test_state.para_id,
+ }
+ .encode(),
+ pending_response,
+ })
+ .await
+ .unwrap();
+ assert_matches!(
+ rx.await,
+ Ok(full_response) => {
+ let CollationFetchingResponse::Collation(..): CollationFetchingResponse =
+ CollationFetchingResponse::decode(
+ &mut full_response.result.expect("We should have a proper answer").as_ref(),
+ )
+ .expect("Decoding should work");
+ }
+ );
+
+ test_state.advance_to_new_round(&mut virtual_overseer, true).await;
+ test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation();
+
+ let head_b = test_state.relay_parent;
+ let group_b = test_state.current_group_validator_authority_ids();
+ assert_ne!(head_a, head_b);
+ assert_ne!(group_a, group_b);
+
+ distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+ // Should be connected to both groups except for the validator that fetched advertised
+ // collation.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridgeTx(
+ NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+ ) => {
+ assert!(!validator_ids.contains(&group_a[0]));
+
+ for validator in group_a[1..].iter().chain(&group_b) {
+ assert!(validator_ids.contains(validator));
+ }
+ }
+ );
+
+ TestHarness { virtual_overseer, req_cfg }
+ });
+}
diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs
new file mode 100644
index 000000000000..5bb31c72d6c5
--- /dev/null
+++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs
@@ -0,0 +1,317 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Validator groups buffer for connection managements.
+//!
+//! Solves 2 problems:
+//! 1. A collator may want to stay connected to multiple groups on rotation boundaries.
+//! 2. It's important to disconnect from validator when there're no collations to be fetched.
+//!
+//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
+//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
+//!
+//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a relay
+//! parent, one can reset a bit back to 0 for particular **validator**. For example, if a collation
+//! was fetched or some timeout has been hit.
+//!
+//! The bitwise OR over known advertisements gives us validators indices for connection request.
+
+use std::{
+ collections::{HashMap, VecDeque},
+ num::NonZeroUsize,
+ ops::Range,
+};
+
+use bitvec::{bitvec, vec::BitVec};
+
+use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};
+
+/// The ring buffer stores at most this many unique validator groups.
+///
+/// This value should be chosen in way that all groups assigned to our para
+/// in the view can fit into the buffer.
+pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
+ Some(cap) => cap,
+ None => panic!("buffer capacity must be non-zero"),
+};
+
+/// Unique identifier of a validators group.
+#[derive(Debug)]
+struct ValidatorsGroupInfo {
+ /// Number of validators in the group.
+ len: usize,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+}
+
+/// Ring buffer of validator groups.
+///
+/// Tracks which peers we want to be connected to with respect to advertised collations.
+#[derive(Debug)]
+pub struct ValidatorGroupsBuffer {
+ /// Validator groups identifiers we **had** advertisements for.
+ group_infos: VecDeque,
+ /// Continuous buffer of validators discovery keys.
+ validators: VecDeque,
+ /// Mapping from relay-parent to bit-vectors with bits for all `validators`.
+ /// Invariants kept: All bit-vectors are guaranteed to have the same size.
+ should_be_connected: HashMap,
+ /// Buffer capacity, limits the number of **groups** tracked.
+ cap: NonZeroUsize,
+}
+
+impl ValidatorGroupsBuffer {
+ /// Creates a new buffer with a non-zero capacity.
+ pub fn with_capacity(cap: NonZeroUsize) -> Self {
+ Self {
+ group_infos: VecDeque::new(),
+ validators: VecDeque::new(),
+ should_be_connected: HashMap::new(),
+ cap,
+ }
+ }
+
+ /// Returns discovery ids of validators we have at least one advertised-but-not-fetched
+ /// collation for.
+ pub fn validators_to_connect(&self) -> Vec {
+ let validators_num = self.validators.len();
+ let bits = self
+ .should_be_connected
+ .values()
+ .fold(bitvec![0; validators_num], |acc, next| acc | next);
+
+ self.validators
+ .iter()
+ .enumerate()
+ .filter_map(|(idx, authority_id)| bits[idx].then_some(authority_id.clone()))
+ .collect()
+ }
+
+ /// Note a new advertisement, marking that we want to be connected to validators
+ /// from this group.
+ ///
+ /// If max capacity is reached and the group is new, drops validators from the back
+ /// of the buffer.
+ pub fn note_collation_advertised(
+ &mut self,
+ relay_parent: Hash,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+ validators: &[AuthorityDiscoveryId],
+ ) {
+ if validators.is_empty() {
+ return
+ }
+
+ match self.group_infos.iter().enumerate().find(|(_, group)| {
+ group.session_index == session_index && group.group_index == group_index
+ }) {
+ Some((idx, group)) => {
+ let group_start_idx = self.group_lengths_iter().take(idx).sum();
+ self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len));
+ },
+ None => self.push(relay_parent, session_index, group_index, validators),
+ }
+ }
+
+ /// Note that a validator is no longer interested in a given relay parent.
+ pub fn reset_validator_interest(
+ &mut self,
+ relay_parent: Hash,
+ authority_id: &AuthorityDiscoveryId,
+ ) {
+ let bits = match self.should_be_connected.get_mut(&relay_parent) {
+ Some(bits) => bits,
+ None => return,
+ };
+
+ for (idx, auth_id) in self.validators.iter().enumerate() {
+ if auth_id == authority_id {
+ bits.set(idx, false);
+ }
+ }
+ }
+
+ /// Remove relay parent from the buffer.
+ ///
+ /// The buffer will no longer track which validators are interested in a corresponding
+ /// advertisement.
+ pub fn remove_relay_parent(&mut self, relay_parent: &Hash) {
+ self.should_be_connected.remove(relay_parent);
+ }
+
+ /// Removes all advertisements from the buffer.
+ pub fn clear_advertisements(&mut self) {
+ self.should_be_connected.clear();
+ }
+
+ /// Pushes a new group to the buffer along with advertisement, setting all validators
+ /// bits to 1.
+ ///
+ /// If the buffer is full, drops group from the tail.
+ fn push(
+ &mut self,
+ relay_parent: Hash,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+ validators: &[AuthorityDiscoveryId],
+ ) {
+ let new_group_info =
+ ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
+
+ let buf = &mut self.group_infos;
+ let cap = self.cap.get();
+
+ if buf.len() >= cap {
+ let pruned_group = buf.pop_front().expect("buf is not empty; qed");
+ self.validators.drain(..pruned_group.len);
+
+ self.should_be_connected.values_mut().for_each(|bits| {
+ bits.as_mut_bitslice().shift_left(pruned_group.len);
+ });
+ }
+
+ self.validators.extend(validators.iter().cloned());
+ buf.push_back(new_group_info);
+ let buf_len = buf.len();
+ let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
+
+ let new_len = self.validators.len();
+ self.should_be_connected
+ .values_mut()
+ .for_each(|bits| bits.resize(new_len, false));
+ self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len()));
+ }
+
+ /// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
+ /// If the relay parent is unknown, inserts 0-initialized bitvec first.
+ ///
+ /// The range must be ensured to be within bounds.
+ fn set_bits(&mut self, relay_parent: Hash, range: Range) {
+ let bits = self
+ .should_be_connected
+ .entry(relay_parent)
+ .or_insert_with(|| bitvec![0; self.validators.len()]);
+
+ bits[range].fill(true);
+ }
+
+ /// Returns iterator over numbers of validators in groups.
+ ///
+ /// Useful for getting an index of the first validator in i-th group.
+ fn group_lengths_iter(&self) -> impl Iterator + '_ {
+ self.group_infos.iter().map(|group| group.len)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sp_keyring::Sr25519Keyring;
+
+ #[test]
+ fn one_capacity_buffer() {
+ let cap = NonZeroUsize::new(1).unwrap();
+ let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+ let hash_a = Hash::repeat_byte(0x1);
+ let hash_b = Hash::repeat_byte(0x2);
+
+ let validators: Vec<_> = [
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ]
+ .into_iter()
+ .map(|key| AuthorityDiscoveryId::from(key.public()))
+ .collect();
+
+ assert!(buf.validators_to_connect().is_empty());
+
+ buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
+ assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+ buf.reset_validator_interest(hash_a, &validators[1]);
+ assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]);
+
+ buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
+ assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
+
+ for validator in &validators[2..] {
+ buf.reset_validator_interest(hash_b, validator);
+ }
+ assert!(buf.validators_to_connect().is_empty());
+ }
+
+ #[test]
+ fn buffer_works() {
+ let cap = NonZeroUsize::new(3).unwrap();
+ let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+ let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect();
+
+ let validators: Vec<_> = [
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ]
+ .into_iter()
+ .map(|key| AuthorityDiscoveryId::from(key.public()))
+ .collect();
+
+ buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
+ buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
+ buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+ buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+
+ assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
+
+ for validator in &validators[2..4] {
+ buf.reset_validator_interest(hashes[2], validator);
+ }
+
+ buf.reset_validator_interest(hashes[1], &validators[0]);
+ assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+ buf.reset_validator_interest(hashes[0], &validators[0]);
+ assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]);
+
+ buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
+ buf.note_collation_advertised(
+ hashes[4],
+ 0,
+ GroupIndex(2),
+ std::slice::from_ref(&validators[4]),
+ );
+
+ buf.reset_validator_interest(hashes[3], &validators[2]);
+ buf.note_collation_advertised(
+ hashes[4],
+ 0,
+ GroupIndex(3),
+ std::slice::from_ref(&validators[0]),
+ );
+
+ assert_eq!(
+ buf.validators_to_connect(),
+ vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
+ );
+ }
+}
diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs
index 66659e4b5bee..b71acc127c88 100644
--- a/node/network/collator-protocol/src/lib.rs
+++ b/node/network/collator-protocol/src/lib.rs
@@ -21,9 +21,12 @@
#![deny(unused_crate_dependencies)]
#![recursion_limit = "256"]
-use std::time::Duration;
+use std::time::{Duration, Instant};
-use futures::{FutureExt, TryFutureExt};
+use futures::{
+ stream::{FusedStream, StreamExt},
+ FutureExt, TryFutureExt,
+};
use sp_keystore::SyncCryptoStorePtr;
@@ -134,3 +137,23 @@ async fn modify_reputation(
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
}
+
+/// Wait until tick and return the timestamp for the following one.
+async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
+ let now = Instant::now();
+ let next_poll = last_poll + period;
+
+ if next_poll > now {
+ futures_timer::Delay::new(next_poll - now).await
+ }
+
+ Instant::now()
+}
+
+/// Returns an infinite stream that yields with an interval of `period`.
+fn tick_stream(period: Duration) -> impl FusedStream {
+ futures::stream::unfold(Instant::now(), move |next_check| async move {
+ Some(((), wait_until_next_tick(next_check, period).await))
+ })
+ .fuse()
+}
diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs
index 47795aac0ce2..b74c1d5b5a4f 100644
--- a/node/network/collator-protocol/src/validator_side/mod.rs
+++ b/node/network/collator-protocol/src/validator_side/mod.rs
@@ -19,7 +19,7 @@ use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
select,
- stream::{FusedStream, FuturesUnordered},
+ stream::FuturesUnordered,
FutureExt, StreamExt,
};
use futures_timer::Delay;
@@ -57,7 +57,7 @@ use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
use crate::error::Result;
-use super::{modify_reputation, LOG_TARGET};
+use super::{modify_reputation, tick_stream, LOG_TARGET};
#[cfg(test)]
mod tests;
@@ -97,7 +97,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10);
// How often to poll collation responses.
// This is a hack that should be removed in a refactoring.
// See https://github.com/paritytech/polkadot/issues/4182
-const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
+const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(50);
#[derive(Clone, Default)]
pub struct Metrics(Option);
@@ -1167,25 +1167,6 @@ async fn process_msg(
}
}
-// wait until next inactivity check. returns the instant for the following check.
-async fn wait_until_next_check(last_poll: Instant) -> Instant {
- let now = Instant::now();
- let next_poll = last_poll + ACTIVITY_POLL;
-
- if next_poll > now {
- Delay::new(next_poll - now).await
- }
-
- Instant::now()
-}
-
-fn infinite_stream(every: Duration) -> impl FusedStream {
- futures::stream::unfold(Instant::now() + every, |next_check| async move {
- Some(((), wait_until_next_check(next_check).await))
- })
- .fuse()
-}
-
/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run(
@@ -1196,10 +1177,10 @@ pub(crate) async fn run(
) -> std::result::Result<(), crate::error::FatalError> {
let mut state = State { metrics, ..Default::default() };
- let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
+ let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);
- let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
+ let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);
loop {
diff --git a/node/network/dispute-distribution/Cargo.toml b/node/network/dispute-distribution/Cargo.toml
index f50f24bf42c8..a731175f0521 100644
--- a/node/network/dispute-distribution/Cargo.toml
+++ b/node/network/dispute-distribution/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
futures = "0.3.21"
+futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
derive_more = "0.99.17"
parity-scale-codec = { version = "3.1.5", features = ["std"] }
@@ -20,7 +21,8 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.31"
fatality = "0.0.6"
-lru = "0.7.7"
+lru = "0.8.0"
+indexmap = "1.9.1"
[dev-dependencies]
async-trait = "0.1.57"
diff --git a/node/network/dispute-distribution/src/lib.rs b/node/network/dispute-distribution/src/lib.rs
index aefd66e0ae79..f109d5e6a40e 100644
--- a/node/network/dispute-distribution/src/lib.rs
+++ b/node/network/dispute-distribution/src/lib.rs
@@ -24,6 +24,8 @@
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].
+use std::{num::NonZeroUsize, time::Duration};
+
use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};
use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
@@ -64,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish};
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
-/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
-/// the following:
+/// batch up imports as well as possible for efficient imports while maintaining timely dispute
+/// resolution and handling of spamming validators:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
-/// - Drop messages from a node, if we are already importing a message from that node (flood).
-/// - Drop messages from nodes, that provided us messages where the statement import failed.
+/// - Drop messages from a node, if it sends at a too high rate.
+/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
+/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
+/// do, while at the same time making it aware of new disputes as fast as possible.
+///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
/// receiver.
@@ -93,6 +98,20 @@ pub use metrics::Metrics;
const LOG_TARGET: &'static str = "parachain::dispute-distribution";
+/// Rate limit on the `receiver` side.
+///
+/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
+/// start dropping messages from that peer to enforce that limit.
+pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);
+
+/// Rate limit on the `sender` side.
+///
+/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
+/// well.
+///
+/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
+pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));
+
/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem {
/// Easy and efficient runtime access for this subsystem.
@@ -145,7 +164,8 @@ where
) -> Self {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: Some(keystore),
- session_cache_lru_size: DISPUTE_WINDOW.get() as usize,
+ session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize)
+ .expect("Dispute window can not be 0; qed"),
});
let (tx, sender_rx) = mpsc::channel(1);
let disputes_sender = DisputeSender::new(tx, metrics.clone());
@@ -172,6 +192,12 @@ where
ctx.spawn("disputes-receiver", receiver.run().boxed())
.map_err(FatalError::SpawnTask)?;
+ // Process messages for sending side.
+ //
+ // Note: We want the sender to be rate limited and we are currently taking advantage of the
+ // fact that the root task of this subsystem is only concerned with sending: Functions of
+ // `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
+ // loop. If this fact ever changes, we will likely need another task.
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
@@ -247,9 +273,10 @@ impl MuxedMessage {
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
- futures::select!(
- msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
+ // We select biased to make sure we finish up loose ends, before starting new work.
+ futures::select_biased!(
msg = from_sender.next() => MuxedMessage::Sender(msg),
+ msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
)
}
}
diff --git a/node/network/dispute-distribution/src/metrics.rs b/node/network/dispute-distribution/src/metrics.rs
index 3f717bd105c3..aa2feeaad3a0 100644
--- a/node/network/dispute-distribution/src/metrics.rs
+++ b/node/network/dispute-distribution/src/metrics.rs
@@ -72,9 +72,12 @@ impl Metrics {
}
/// Statements have been imported.
- pub fn on_imported(&self, label: &'static str) {
+ pub fn on_imported(&self, label: &'static str, num_requests: usize) {
if let Some(metrics) = &self.0 {
- metrics.imported_requests.with_label_values(&[label]).inc()
+ metrics
+ .imported_requests
+ .with_label_values(&[label])
+ .inc_by(num_requests as u64)
}
}
diff --git a/node/network/dispute-distribution/src/receiver/batches/batch.rs b/node/network/dispute-distribution/src/receiver/batches/batch.rs
new file mode 100644
index 000000000000..eebed25ed790
--- /dev/null
+++ b/node/network/dispute-distribution/src/receiver/batches/batch.rs
@@ -0,0 +1,209 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use std::{collections::HashMap, time::Instant};
+
+use gum::CandidateHash;
+use polkadot_node_network_protocol::{
+ request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
+ PeerId,
+};
+use polkadot_node_primitives::SignedDisputeStatement;
+use polkadot_primitives::v2::{CandidateReceipt, ValidatorIndex};
+
+use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};
+
+use super::MAX_BATCH_LIFETIME;
+
+/// A batch of votes to be imported into the `dispute-coordinator`.
+///
+/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
+/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
+/// coordinator.
+///
+/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
+/// it will "flush" in case the incoming rate dropped too low, preparing the import.
+pub struct Batch {
+ /// The actual candidate this batch is concerned with.
+ candidate_receipt: CandidateReceipt,
+
+ /// Cache of `CandidateHash` (candidate_receipt.hash()).
+ candidate_hash: CandidateHash,
+
+ /// All valid votes received in this batch so far.
+ ///
+ /// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
+ /// while still allowing validators to equivocate.
+ ///
+ /// Detecting and rejecting duplicates is crucial in order to effectively enforce
+ /// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
+ /// here, the mechanism would be broken.
+ valid_votes: HashMap,
+
+ /// All invalid votes received in this batch so far.
+ invalid_votes: HashMap,
+
+ /// How many votes have been batched since the last tick/creation.
+ votes_batched_since_last_tick: u32,
+
+ /// Expiry time for the batch.
+ ///
+ /// By this time the latest this batch will get flushed.
+ best_before: Instant,
+
+ /// Requesters waiting for a response.
+ requesters: Vec<(PeerId, OutgoingResponseSender)>,
+}
+
+/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
+pub(super) enum TickResult {
+ /// Batch is still alive, please call `tick` again at the given `Instant`.
+ Alive(Batch, Instant),
+ /// Batch is done, ready for import!
+ Done(PreparedImport),
+}
+
+/// Ready for import.
+pub struct PreparedImport {
+ pub candidate_receipt: CandidateReceipt,
+ pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
+ /// Information about original requesters.
+ pub requesters: Vec<(PeerId, OutgoingResponseSender)>,
+}
+
+impl From for PreparedImport {
+ fn from(batch: Batch) -> Self {
+ let Batch {
+ candidate_receipt,
+ valid_votes,
+ invalid_votes,
+ requesters: pending_responses,
+ ..
+ } = batch;
+
+ let statements = valid_votes
+ .into_iter()
+ .chain(invalid_votes.into_iter())
+ .map(|(index, statement)| (statement, index))
+ .collect();
+
+ Self { candidate_receipt, statements, requesters: pending_responses }
+ }
+}
+
+impl Batch {
+ /// Create a new empty batch based on the given `CandidateReceipt`.
+ ///
+ /// To create a `Batch` use Batches::find_batch`.
+ ///
+ /// Arguments:
+ ///
+ /// * `candidate_receipt` - The candidate this batch is meant to track votes for.
+ /// * `now` - current time stamp for calculating the first tick.
+ ///
+ /// Returns: A batch and the first `Instant` you are supposed to call `tick`.
+ pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
+ let s = Self {
+ candidate_hash: candidate_receipt.hash(),
+ candidate_receipt,
+ valid_votes: HashMap::new(),
+ invalid_votes: HashMap::new(),
+ votes_batched_since_last_tick: 0,
+ best_before: Instant::now() + MAX_BATCH_LIFETIME,
+ requesters: Vec::new(),
+ };
+ let next_tick = s.calculate_next_tick(now);
+ (s, next_tick)
+ }
+
+ /// Receipt of the candidate this batch is batching votes for.
+ pub fn candidate_receipt(&self) -> &CandidateReceipt {
+ &self.candidate_receipt
+ }
+
+ /// Add votes from a validator into the batch.
+ ///
+ /// The statements are supposed to be the valid and invalid statements received in a
+ /// `DisputeRequest`.
+ ///
+ /// The given `pending_response` is the corresponding response sender for responding to `peer`.
+ /// If at least one of the votes is new as far as this batch is concerned we record the
+ /// pending_response, for later use. In case both votes are known already, we return the
+ /// response sender as an `Err` value.
+ pub fn add_votes(
+ &mut self,
+ valid_vote: (SignedDisputeStatement, ValidatorIndex),
+ invalid_vote: (SignedDisputeStatement, ValidatorIndex),
+ peer: PeerId,
+ pending_response: OutgoingResponseSender,
+ ) -> Result<(), OutgoingResponseSender> {
+ debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
+ debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);
+
+ let mut duplicate = true;
+
+ if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
+ self.votes_batched_since_last_tick += 1;
+ duplicate = false;
+ }
+ if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
+ self.votes_batched_since_last_tick += 1;
+ duplicate = false;
+ }
+
+ if duplicate {
+ Err(pending_response)
+ } else {
+ self.requesters.push((peer, pending_response));
+ Ok(())
+ }
+ }
+
+ /// Check batch for liveness.
+ ///
+ /// This function is supposed to be called at instants given at construction and as returned as
+ /// part of `TickResult`.
+ pub(super) fn tick(mut self, now: Instant) -> TickResult {
+ if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
+ now < self.best_before
+ {
+ // Still good:
+ let next_tick = self.calculate_next_tick(now);
+ // Reset counter:
+ self.votes_batched_since_last_tick = 0;
+ TickResult::Alive(self, next_tick)
+ } else {
+ TickResult::Done(PreparedImport::from(self))
+ }
+ }
+
+ /// Calculate when the next tick should happen.
+ ///
+ /// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this batch
+ /// would exceed `MAX_BATCH_LIFETIME`.
+ ///
+ /// # Arguments
+ ///
+ /// * `now` - The current time.
+ fn calculate_next_tick(&self, now: Instant) -> Instant {
+ let next_tick = now + BATCH_COLLECTING_INTERVAL;
+ if next_tick < self.best_before {
+ next_tick
+ } else {
+ self.best_before
+ }
+ }
+}
diff --git a/node/network/dispute-distribution/src/receiver/batches/mod.rs b/node/network/dispute-distribution/src/receiver/batches/mod.rs
new file mode 100644
index 000000000000..b343b55e0b04
--- /dev/null
+++ b/node/network/dispute-distribution/src/receiver/batches/mod.rs
@@ -0,0 +1,170 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use std::{
+ collections::{hash_map, HashMap},
+ time::{Duration, Instant},
+};
+
+use futures::future::pending;
+
+use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT;
+use polkadot_primitives::v2::{CandidateHash, CandidateReceipt};
+
+use crate::{
+ receiver::batches::{batch::TickResult, waiting_queue::PendingWake},
+ LOG_TARGET,
+};
+
+pub use self::batch::{Batch, PreparedImport};
+use self::waiting_queue::WaitingQueue;
+
+use super::{
+ error::{JfyiError, JfyiResult},
+ BATCH_COLLECTING_INTERVAL,
+};
+
+/// A single batch (per candidate) as managed by `Batches`.
+mod batch;
+
+/// Queue events in time and wait for them to become ready.
+mod waiting_queue;
+
+/// Safe-guard in case votes trickle in real slow.
+///
+/// If the batch life time exceeded the time the sender is willing to wait for a confirmation, we
+/// would trigger pointless re-sends.
+const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2));
+
+/// Limit the number of batches that can be alive at any given time.
+///
+/// Reasoning for this number, see guide.
+pub const MAX_BATCHES: usize = 1000;
+
+/// Manage batches.
+///
+/// - Batches can be found via `find_batch()` in order to add votes to them/check they exist.
+/// - Batches can be checked for being ready for flushing in order to import contained votes.
+pub struct Batches {
+ /// The batches we manage.
+ ///
+ /// Kept invariants:
+ /// For each entry in `batches`, there exists an entry in `waiting_queue` as well - we wait on
+ /// all batches!
+ batches: HashMap,
+ /// Waiting queue for waiting for batches to become ready for `tick`.
+ ///
+ /// Kept invariants by `Batches`:
+ /// For each entry in the `waiting_queue` there exists a corresponding entry in `batches`.
+ waiting_queue: WaitingQueue,
+}
+
+/// A found batch is either really found or got created so it can be found.
+pub enum FoundBatch<'a> {
+ /// Batch just got created.
+ Created(&'a mut Batch),
+ /// Batch already existed.
+ Found(&'a mut Batch),
+}
+
+impl Batches {
+ /// Create new empty `Batches`.
+ pub fn new() -> Self {
+ debug_assert!(
+ MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL,
+ "Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters."
+ );
+ Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() }
+ }
+
+ /// Find a particular batch.
+ ///
+ /// That is either find it, or we create it as reflected by the result `FoundBatch`.
+ pub fn find_batch(
+ &mut self,
+ candidate_hash: CandidateHash,
+ candidate_receipt: CandidateReceipt,
+ ) -> JfyiResult {
+ if self.batches.len() >= MAX_BATCHES {
+ return Err(JfyiError::MaxBatchLimitReached)
+ }
+ debug_assert!(candidate_hash == candidate_receipt.hash());
+ let result = match self.batches.entry(candidate_hash) {
+ hash_map::Entry::Vacant(vacant) => {
+ let now = Instant::now();
+ let (created, ready_at) = Batch::new(candidate_receipt, now);
+ let pending_wake = PendingWake { payload: candidate_hash, ready_at };
+ self.waiting_queue.push(pending_wake);
+ FoundBatch::Created(vacant.insert(created))
+ },
+ hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()),
+ };
+ Ok(result)
+ }
+
+ /// Wait for the next `tick` to check for ready batches.
+ ///
+ /// This function blocks (returns `Poll::Pending`) until at least one batch can be
+ /// checked for readiness meaning that `BATCH_COLLECTING_INTERVAL` has passed since the last
+ /// check for that batch or it reached end of life.
+ ///
+ /// If this `Batches` instance is empty (does not actually contain any batches), then this
+ /// function will always return `Poll::Pending`.
+ ///
+ /// Returns: A `Vec` of all `PreparedImport`s from batches that became ready.
+ pub async fn check_batches(&mut self) -> Vec {
+ let now = Instant::now();
+
+ let mut imports = Vec::new();
+
+ // Wait for at least one batch to become ready:
+ self.waiting_queue.wait_ready(now).await;
+
+ // Process all ready entries:
+ while let Some(wake) = self.waiting_queue.pop_ready(now) {
+ let batch = self.batches.remove(&wake.payload);
+ debug_assert!(
+ batch.is_some(),
+ "Entries referenced in `waiting_queue` are supposed to exist!"
+ );
+ let batch = match batch {
+ None => return pending().await,
+ Some(batch) => batch,
+ };
+ match batch.tick(now) {
+ TickResult::Done(import) => {
+ gum::trace!(
+ target: LOG_TARGET,
+ candidate_hash = ?wake.payload,
+ "Batch became ready."
+ );
+ imports.push(import);
+ },
+ TickResult::Alive(old_batch, next_tick) => {
+ gum::trace!(
+ target: LOG_TARGET,
+ candidate_hash = ?wake.payload,
+ "Batch found to be still alive on check."
+ );
+ let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick };
+ self.waiting_queue.push(pending_wake);
+ self.batches.insert(wake.payload, old_batch);
+ },
+ }
+ }
+ imports
+ }
+}
diff --git a/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs b/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs
new file mode 100644
index 000000000000..995dc74d358f
--- /dev/null
+++ b/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs
@@ -0,0 +1,204 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
+
+use futures::future::pending;
+use futures_timer::Delay;
+
+/// Wait asynchronously for given `Instant`s one after the other.
+///
+/// `PendingWake`s can be inserted and `WaitingQueue` makes `wait_ready()` to always wait for the
+/// next `Instant` in the queue.
+pub struct WaitingQueue {
+ /// All pending wakes we are supposed to wait on in order.
+ pending_wakes: BinaryHeap>,
+ /// Wait for next `PendingWake`.
+ timer: Option,
+}
+
+/// Represents some event waiting to be processed at `ready_at`.
+///
+/// This is an event in `WaitingQueue`. It provides an `Ord` instance, that sorts descending with
+/// regard to `Instant` (so we get a `min-heap` with the earliest `Instant` at the top).
+#[derive(Eq, PartialEq)]
+pub struct PendingWake {
+ pub payload: Payload,
+ pub ready_at: Instant,
+}
+
+impl WaitingQueue {
+ /// Get a new empty `WaitingQueue`.
+ ///
+ /// If you call `pop` on this queue immediately, it will always return `Poll::Pending`.
+ pub fn new() -> Self {
+ Self { pending_wakes: BinaryHeap::new(), timer: None }
+ }
+
+ /// Push a `PendingWake`.
+ ///
+ /// The next call to `wait_ready` will make sure to wake soon enough to process that new event in a
+ /// timely manner.
+ pub fn push(&mut self, wake: PendingWake) {
+ self.pending_wakes.push(wake);
+ // Reset timer as it is potentially obsolete now:
+ self.timer = None;
+ }
+
+ /// Pop the next ready item.
+ ///
+ /// This function does not wait, if nothing is ready right now as determined by the passed
+ /// `now` time stamp, this function simply returns `None`.
+ pub fn pop_ready(&mut self, now: Instant) -> Option> {
+ let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now);
+ if is_ready {
+ Some(self.pending_wakes.pop().expect("We just peeked. qed."))
+ } else {
+ None
+ }
+ }
+
+ /// Don't pop, just wait until something is ready.
+ ///
+ /// Once this function returns `Poll::Ready(())` `pop_ready()` will return `Some`, if passed
+ /// the same `Instant`.
+ ///
+ /// Whether ready or not is determined based on the passed time stamp `now` which should be the
+ /// current time as returned by `Instant::now()`
+ ///
+ /// This function waits asynchronously for an item to become ready. If there is no more item,
+ /// this call will wait forever (return Poll::Pending without scheduling a wake).
+ pub async fn wait_ready(&mut self, now: Instant) {
+ if let Some(timer) = &mut self.timer {
+ // Previous timer was not done yet.
+ timer.await
+ }
+
+ let next_waiting = self.pending_wakes.peek();
+ let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now);
+ if is_ready {
+ return
+ }
+
+ self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now)));
+ match &mut self.timer {
+ None => return pending().await,
+ Some(timer) => timer.await,
+ }
+ }
+}
+
+impl PartialOrd> for PendingWake {
+ fn partial_cmp(&self, other: &Self) -> Option {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for PendingWake {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Reverse order for min-heap:
+ match other.ready_at.cmp(&self.ready_at) {
+ Ordering::Equal => other.payload.cmp(&self.payload),
+ o => o,
+ }
+ }
+}
+#[cfg(test)]
+mod tests {
+ use std::{
+ task::Poll,
+ time::{Duration, Instant},
+ };
+
+ use assert_matches::assert_matches;
+ use futures::{future::poll_fn, pin_mut, Future};
+
+ use crate::LOG_TARGET;
+
+ use super::{PendingWake, WaitingQueue};
+
+ #[test]
+ fn wait_ready_waits_for_earliest_event_always() {
+ sp_tracing::try_init_simple();
+ let mut queue = WaitingQueue::new();
+ let now = Instant::now();
+ let start = now;
+ queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) });
+ // Push another one in order:
+ queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) });
+ // Push one out of order:
+ queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) });
+ // Push another one at same timestamp (should become ready at the same time)
+ queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) });
+
+ futures::executor::block_on(async move {
+ // No time passed yet - nothing should be ready.
+ assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready");
+
+ // Receive them in order at expected times:
+ queue.wait_ready(now).await;
+ gum::trace!(target: LOG_TARGET, "After first wait.");
+
+ let now = start + Duration::from_millis(1);
+ assert!(Instant::now() - start >= Duration::from_millis(1));
+ assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32));
+ // One more should be ready:
+ assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32));
+ assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
+
+ queue.wait_ready(now).await;
+ gum::trace!(target: LOG_TARGET, "After second wait.");
+ let now = start + Duration::from_millis(3);
+ assert!(Instant::now() - start >= Duration::from_millis(3));
+ assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32));
+ assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
+
+ // Push in between wait:
+ poll_fn(|cx| {
+ let fut = queue.wait_ready(now);
+ pin_mut!(fut);
+ assert_matches!(fut.poll(cx), Poll::Pending);
+ Poll::Ready(())
+ })
+ .await;
+ queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) });
+
+ queue.wait_ready(now).await;
+ // Newly pushed element should have become ready:
+ gum::trace!(target: LOG_TARGET, "After third wait.");
+ let now = start + Duration::from_millis(4);
+ assert!(Instant::now() - start >= Duration::from_millis(4));
+ assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32));
+ assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
+
+ queue.wait_ready(now).await;
+ gum::trace!(target: LOG_TARGET, "After fourth wait.");
+ let now = start + Duration::from_millis(5);
+ assert!(Instant::now() - start >= Duration::from_millis(5));
+ assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32));
+ assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
+
+ // queue empty - should wait forever now:
+ poll_fn(|cx| {
+ let fut = queue.wait_ready(now);
+ pin_mut!(fut);
+ assert_matches!(fut.poll(cx), Poll::Pending);
+ Poll::Ready(())
+ })
+ .await;
+ });
+ }
+}
diff --git a/node/network/dispute-distribution/src/receiver/error.rs b/node/network/dispute-distribution/src/receiver/error.rs
index ce578cc8e0f9..4477335440d0 100644
--- a/node/network/dispute-distribution/src/receiver/error.rs
+++ b/node/network/dispute-distribution/src/receiver/error.rs
@@ -19,8 +19,10 @@
use fatality::Nested;
+use gum::CandidateHash;
use polkadot_node_network_protocol::{request_response::incoming, PeerId};
use polkadot_node_subsystem_util::runtime;
+use polkadot_primitives::v2::AuthorityDiscoveryId;
use crate::LOG_TARGET;
@@ -35,8 +37,8 @@ pub enum Error {
#[error("Retrieving next incoming request failed.")]
IncomingRequest(#[from] incoming::Error),
- #[error("Sending back response to peer {0} failed.")]
- SendResponse(PeerId),
+ #[error("Sending back response to peers {0:#?} failed.")]
+ SendResponses(Vec),
#[error("Changing peer's ({0}) reputation failed.")]
SetPeerReputation(PeerId),
@@ -44,16 +46,29 @@ pub enum Error {
#[error("Dispute request with invalid signatures, from peer {0}.")]
InvalidSignature(PeerId),
- #[error("Import of dispute got canceled for peer {0} - import failed for some reason.")]
- ImportCanceled(PeerId),
+ #[error("Received votes from peer {0} have been completely redundant.")]
+ RedundantMessage(PeerId),
+
+ #[error("Import of dispute got canceled for candidate {0} - import failed for some reason.")]
+ ImportCanceled(CandidateHash),
#[error("Peer {0} attempted to participate in dispute and is not a validator.")]
NotAValidator(PeerId),
+
+ #[error("Force flush for batch that could not be found attempted, candidate hash: {0}")]
+ ForceFlushBatchDoesNotExist(CandidateHash),
+
+ // Should never happen in practice:
+ #[error("We needed to drop messages, because we reached limit on concurrent batches.")]
+ MaxBatchLimitReached,
+
+ #[error("Authority {0} sent messages at a too high rate.")]
+ AuthorityFlooding(AuthorityDiscoveryId),
}
pub type Result = std::result::Result;
-pub type JfyiErrorResult = std::result::Result;
+pub type JfyiResult = std::result::Result;
/// Utility for eating top level errors and log them.
///
diff --git a/node/network/dispute-distribution/src/receiver/mod.rs b/node/network/dispute-distribution/src/receiver/mod.rs
index 9193947e78d1..158c66e20655 100644
--- a/node/network/dispute-distribution/src/receiver/mod.rs
+++ b/node/network/dispute-distribution/src/receiver/mod.rs
@@ -15,20 +15,21 @@
// along with Polkadot. If not, see .
use std::{
- collections::HashSet,
+ num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
+ time::Duration,
};
use futures::{
channel::oneshot,
- future::{poll_fn, BoxFuture},
+ future::poll_fn,
pin_mut,
- stream::{FusedStream, FuturesUnordered, StreamExt},
- Future, FutureExt, Stream,
+ stream::{FuturesUnordered, StreamExt},
+ Future,
};
-use lru::LruCache;
+use gum::CandidateHash;
use polkadot_node_network_protocol::{
authority_discovery::AuthorityDiscovery,
request_response::{
@@ -51,20 +52,47 @@ use crate::{
};
mod error;
-use self::error::{log_error, JfyiError, JfyiErrorResult, Result};
+
+/// Rate limiting queues for incoming requests by peers.
+mod peer_queues;
+
+/// Batch imports together.
+mod batches;
+
+use self::{
+ batches::{Batches, FoundBatch, PreparedImport},
+ error::{log_error, JfyiError, JfyiResult, Result},
+ peer_queues::PeerQueues,
+};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
-const COST_INVALID_CANDIDATE: Rep = Rep::Malicious("Reported candidate was not available.");
+const COST_INVALID_IMPORT: Rep =
+ Rep::Malicious("Import was deemed invalid by dispute-coordinator.");
const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
+/// Mildly punish peers exceeding their rate limit.
+///
+/// For honest peers this should rarely happen, but if it happens we would not want to disconnect
+/// too quickly. Minor cost should suffice for disconnecting any real flooder.
+const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Peer exceeded the rate limit.");
-/// How many statement imports we want to issue in parallel:
-pub const MAX_PARALLEL_IMPORTS: usize = 10;
+/// How many votes must have arrived in the last `BATCH_COLLECTING_INTERVAL`
+///
+/// in order for a batch to stay alive and not get flushed/imported to the dispute-coordinator.
+///
+/// This ensures a timely import of batches.
+#[cfg(not(test))]
+pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10;
+#[cfg(test)]
+pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2;
-/// State for handling incoming `DisputeRequest` messages.
+/// Time we allow to pass for new votes to trickle in.
///
-/// This is supposed to run as its own task in order to easily impose back pressure on the incoming
-/// request channel and at the same time to drop flood messages as fast as possible.
+/// See `MIN_KEEP_BATCH_ALIVE_VOTES` above.
+/// Should be greater or equal to `RECEIVE_RATE_LIMIT` (there is no point in checking any faster).
+pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500);
+
+/// State for handling incoming `DisputeRequest` messages.
pub struct DisputesReceiver {
/// Access to session information.
runtime: RuntimeInfo,
@@ -75,18 +103,17 @@ pub struct DisputesReceiver {
/// Channel to retrieve incoming requests from.
receiver: IncomingRequestReceiver,
+ /// Rate limiting queue for each peer (only authorities).
+ peer_queues: PeerQueues,
+
+ /// Currently active batches of imports per candidate.
+ batches: Batches,
+
/// Authority discovery service:
authority_discovery: AD,
- /// Imports currently being processed.
- pending_imports: PendingImports,
-
- /// We keep record of the last banned peers.
- ///
- /// This is needed because once we ban a peer, we will very likely still have pending requests
- /// in the incoming channel - we should not waste time recovering availability for those, as we
- /// already know the peer is malicious.
- banned_peers: LruCache,
+ /// Imports currently being processed by the `dispute-coordinator`.
+ pending_imports: FuturesUnordered,
/// Log received requests.
metrics: Metrics,
@@ -100,36 +127,24 @@ enum MuxedMessage {
///
/// - We need to make sure responses are actually sent (therefore we need to await futures
/// promptly).
- /// - We need to update `banned_peers` accordingly to the result.
- ConfirmedImport(JfyiErrorResult<(PeerId, ImportStatementsResult)>),
+ /// - We need to punish peers whose import got rejected.
+ ConfirmedImport(ImportResult),
/// A new request has arrived and should be handled.
NewRequest(IncomingRequest),
-}
-impl MuxedMessage {
- async fn receive(
- pending_imports: &mut PendingImports,
- pending_requests: &mut IncomingRequestReceiver,
- ) -> Result {
- poll_fn(|ctx| {
- let next_req = pending_requests.recv(|| vec![COST_INVALID_REQUEST]);
- pin_mut!(next_req);
- if let Poll::Ready(r) = next_req.poll(ctx) {
- return match r {
- Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
- Ok(v) => Poll::Ready(Ok(Self::NewRequest(v))),
- }
- }
- // In case of Ready(None) return `Pending` below - we want to wait for the next request
- // in that case.
- if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) {
- return Poll::Ready(Ok(Self::ConfirmedImport(v)))
- }
- Poll::Pending
- })
- .await
- }
+ /// Rate limit timer hit - is is time to process one row of messages.
+ ///
+ /// This is the result of calling `self.peer_queues.pop_reqs()`.
+ WakePeerQueuesPopReqs(Vec>),
+
+ /// It is time to check batches.
+ ///
+ /// Every `BATCH_COLLECTING_INTERVAL` we check whether less than `MIN_KEEP_BATCH_ALIVE_VOTES`
+ /// new votes arrived, if so the batch is ready for import.
+ ///
+ /// This is the result of calling `self.batches.check_batches()`.
+ WakeCheckBatches(Vec),
}
impl DisputesReceiver
@@ -146,17 +161,17 @@ where
) -> Self {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: None,
- session_cache_lru_size: DISPUTE_WINDOW.get() as usize,
+ session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize)
+ .expect("Dispute window can not be 0; qed"),
});
Self {
runtime,
sender,
receiver,
+ peer_queues: PeerQueues::new(),
+ batches: Batches::new(),
authority_discovery,
- pending_imports: PendingImports::new(),
- // Size of MAX_PARALLEL_IMPORTS ensures we are going to immediately get rid of any
- // malicious requests still pending in the incoming queue.
- banned_peers: LruCache::new(MAX_PARALLEL_IMPORTS),
+ pending_imports: FuturesUnordered::new(),
metrics,
}
}
@@ -180,60 +195,132 @@ where
}
}
- /// Actual work happening here.
+ /// Actual work happening here in three phases:
+ ///
+ /// 1. Receive and queue incoming messages until the rate limit timer hits.
+ /// 2. Do import/batching for the head of all queues.
+ /// 3. Check and flush any ready batches.
async fn run_inner(&mut self) -> Result<()> {
- let msg = MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).await?;
+ let msg = self.receive_message().await?;
- let incoming = match msg {
- // We need to clean up futures, to make sure responses are sent:
- MuxedMessage::ConfirmedImport(m_bad) => {
- self.ban_bad_peer(m_bad)?;
- return Ok(())
+ match msg {
+ MuxedMessage::NewRequest(req) => {
+ // Phase 1:
+ self.metrics.on_received_request();
+ self.dispatch_to_queues(req).await?;
},
- MuxedMessage::NewRequest(req) => req,
- };
+ MuxedMessage::WakePeerQueuesPopReqs(reqs) => {
+ // Phase 2:
+ for req in reqs {
+ // No early return - we cannot cancel imports of one peer, because the import of
+ // another failed:
+ match log_error(self.start_import_or_batch(req).await) {
+ Ok(()) => {},
+ Err(fatal) => return Err(fatal.into()),
+ }
+ }
+ },
+ MuxedMessage::WakeCheckBatches(ready_imports) => {
+ // Phase 3:
+ self.import_ready_batches(ready_imports).await;
+ },
+ MuxedMessage::ConfirmedImport(import_result) => {
+ self.update_imported_requests_metrics(&import_result);
+ // Confirm imports to requesters/punish them on invalid imports:
+ send_responses_to_requesters(import_result).await?;
+ },
+ }
+
+ Ok(())
+ }
+
+ /// Receive one `MuxedMessage`.
+ ///
+ ///
+ /// Dispatching events to messages as they happen.
+ async fn receive_message(&mut self) -> Result {
+ poll_fn(|ctx| {
+ // In case of Ready(None), we want to wait for pending requests:
+ if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) {
+ return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?)))
+ }
+
+ let rate_limited = self.peer_queues.pop_reqs();
+ pin_mut!(rate_limited);
+ // We poll rate_limit before batches, so we don't unnecessarily delay importing to
+ // batches.
+ if let Poll::Ready(reqs) = rate_limited.poll(ctx) {
+ return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs)))
+ }
- self.metrics.on_received_request();
+ let ready_batches = self.batches.check_batches();
+ pin_mut!(ready_batches);
+ if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) {
+ return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches)))
+ }
- let peer = incoming.peer;
+ let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]);
+ pin_mut!(next_req);
+ if let Poll::Ready(r) = next_req.poll(ctx) {
+ return match r {
+ Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
+ Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))),
+ }
+ }
+ Poll::Pending
+ })
+ .await
+ }
- // Only accept messages from validators:
- if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() {
- incoming
- .send_outgoing_response(OutgoingResponse {
+ /// Process incoming requests.
+ ///
+ /// - Check sender is authority
+ /// - Dispatch message to corresponding queue in `peer_queues`.
+ /// - If queue is full, drop message and change reputation of sender.
+ async fn dispatch_to_queues(&mut self, req: IncomingRequest) -> JfyiResult<()> {
+ let peer = req.peer;
+ // Only accept messages from validators, in case there are multiple `AuthorityId`s, we
+ // just take the first one. On session boundaries this might allow validators to double
+ // their rate limit for a short period of time, which seems acceptable.
+ let authority_id = match self
+ .authority_discovery
+ .get_authority_ids_by_peer_id(peer)
+ .await
+ .and_then(|s| s.into_iter().next())
+ {
+ None => {
+ req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_NOT_A_VALIDATOR],
sent_feedback: None,
})
- .map_err(|_| JfyiError::SendResponse(peer))?;
-
- return Err(JfyiError::NotAValidator(peer).into())
- }
-
- // Immediately drop requests from peers that already have requests in flight or have
- // been banned recently (flood protection):
- if self.pending_imports.peer_is_pending(&peer) || self.banned_peers.contains(&peer) {
- gum::trace!(
- target: LOG_TARGET,
- ?peer,
- "Dropping message from peer (banned/pending import)"
- );
- return Ok(())
- }
+ .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
+ return Err(JfyiError::NotAValidator(peer).into())
+ },
+ Some(auth_id) => auth_id,
+ };
- // Wait for a free slot:
- if self.pending_imports.len() >= MAX_PARALLEL_IMPORTS as usize {
- // Wait for one to finish:
- let r = self.pending_imports.next().await;
- self.ban_bad_peer(r.expect("pending_imports.len() is greater 0. qed."))?;
+ // Queue request:
+ if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
+ req.send_outgoing_response(OutgoingResponse {
+ result: Err(()),
+ reputation_changes: vec![COST_APPARENT_FLOOD],
+ sent_feedback: None,
+ })
+ .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
+ return Err(JfyiError::AuthorityFlooding(authority_id))
}
-
- // All good - initiate import.
- self.start_import(incoming).await
+ Ok(())
}
- /// Start importing votes for the given request.
- async fn start_import(&mut self, incoming: IncomingRequest) -> Result<()> {
+ /// Start importing votes for the given request or batch.
+ ///
+ /// Signature check and in case we already have an existing batch we import to that batch,
+ /// otherwise import to `dispute-coordinator` directly and open a batch.
+ async fn start_import_or_batch(
+ &mut self,
+ incoming: IncomingRequest,
+ ) -> Result<()> {
let IncomingRequest { peer, payload, pending_response } = incoming;
let info = self
@@ -263,128 +350,172 @@ where
Ok(votes) => votes,
};
- let (pending_confirmation, confirmation_rx) = oneshot::channel();
- self.sender
- .send_message(DisputeCoordinatorMessage::ImportStatements {
- candidate_receipt,
- session: valid_vote.0.session_index(),
- statements: vec![valid_vote, invalid_vote],
- pending_confirmation: Some(pending_confirmation),
- })
- .await;
+ let candidate_hash = *valid_vote.0.candidate_hash();
+
+ match self.batches.find_batch(candidate_hash, candidate_receipt)? {
+ FoundBatch::Created(batch) => {
+ // There was no entry yet - start import immediately:
+ gum::trace!(
+ target: LOG_TARGET,
+ ?candidate_hash,
+ ?peer,
+ "No batch yet - triggering immediate import"
+ );
+ let import = PreparedImport {
+ candidate_receipt: batch.candidate_receipt().clone(),
+ statements: vec![valid_vote, invalid_vote],
+ requesters: vec![(peer, pending_response)],
+ };
+ self.start_import(import).await;
+ },
+ FoundBatch::Found(batch) => {
+ gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request");
+ let batch_result =
+ batch.add_votes(valid_vote, invalid_vote, peer, pending_response);
+
+ if let Err(pending_response) = batch_result {
+ // We don't expect honest peers to send redundant votes within a single batch,
+ // as the timeout for retry is much higher. Still we don't want to punish the
+ // node as it might not be the node's fault. Some other (malicious) node could have been
+ // faster sending the same votes in order to harm the reputation of that honest
+ // node. Given that we already have a rate limit, if a validator chooses to
+ // waste available rate with redundant votes - so be it. The actual dispute
+ // resolution is unaffected.
+ gum::debug!(
+ target: LOG_TARGET,
+ ?peer,
+ "Peer sent completely redundant votes within a single batch - that looks fishy!",
+ );
+ pending_response
+ .send_outgoing_response(OutgoingResponse {
+ // While we have seen duplicate votes, we cannot confirm as we don't
+ // know yet whether the batch is going to be confirmed, so we assume
+ // the worst. We don't want to push the pending response to the batch
+ // either as that would be unbounded, only limited by the rate limit.
+ result: Err(()),
+ reputation_changes: Vec::new(),
+ sent_feedback: None,
+ })
+ .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
+ return Err(From::from(JfyiError::RedundantMessage(peer)))
+ }
+ },
+ }
- self.pending_imports.push(peer, confirmation_rx, pending_response);
Ok(())
}
- /// Await an import and ban any misbehaving peers.
- ///
- /// In addition we report import metrics.
- fn ban_bad_peer(
- &mut self,
- result: JfyiErrorResult<(PeerId, ImportStatementsResult)>,
- ) -> JfyiErrorResult<()> {
- match result? {
- (_, ImportStatementsResult::ValidImport) => {
- self.metrics.on_imported(SUCCEEDED);
- },
- (bad_peer, ImportStatementsResult::InvalidImport) => {
- self.metrics.on_imported(FAILED);
- self.banned_peers.put(bad_peer, ());
- },
+ /// Trigger import into the dispute-coordinator of ready batches (`PreparedImport`s).
+ async fn import_ready_batches(&mut self, ready_imports: Vec) {
+ for import in ready_imports {
+ self.start_import(import).await;
}
- Ok(())
}
-}
-/// Manage pending imports in a way that preserves invariants.
-struct PendingImports {
- /// Futures in flight.
- futures:
- FuturesUnordered)>>,
- /// Peers whose requests are currently in flight.
- peers: HashSet,
-}
+ /// Start import and add response receiver to `pending_imports`.
+ async fn start_import(&mut self, import: PreparedImport) {
+ let PreparedImport { candidate_receipt, statements, requesters } = import;
+ let (session_index, candidate_hash) = match statements.iter().next() {
+ None => {
+ gum::debug!(
+ target: LOG_TARGET,
+ candidate_hash = ?candidate_receipt.hash(),
+ "Not importing empty batch"
+ );
+ return
+ },
+ Some(vote) => (vote.0.session_index(), vote.0.candidate_hash().clone()),
+ };
-impl PendingImports {
- pub fn new() -> Self {
- Self { futures: FuturesUnordered::new(), peers: HashSet::new() }
- }
+ let (pending_confirmation, confirmation_rx) = oneshot::channel();
+ self.sender
+ .send_message(DisputeCoordinatorMessage::ImportStatements {
+ candidate_receipt,
+ session: session_index,
+ statements,
+ pending_confirmation: Some(pending_confirmation),
+ })
+ .await;
- pub fn push(
- &mut self,
- peer: PeerId,
- handled: oneshot::Receiver,
- pending_response: OutgoingResponseSender,
- ) {
- self.peers.insert(peer);
- self.futures.push(
- async move {
- let r = respond_to_request(peer, handled, pending_response).await;
- (peer, r)
- }
- .boxed(),
- )
- }
+ let pending =
+ PendingImport { candidate_hash, requesters, pending_response: confirmation_rx };
- /// Returns the number of contained futures.
- pub fn len(&self) -> usize {
- self.futures.len()
+ self.pending_imports.push(pending);
}
- /// Check whether a peer has a pending import.
- pub fn peer_is_pending(&self, peer: &PeerId) -> bool {
- self.peers.contains(peer)
+ fn update_imported_requests_metrics(&self, result: &ImportResult) {
+ let label = match result.result {
+ ImportStatementsResult::ValidImport => SUCCEEDED,
+ ImportStatementsResult::InvalidImport => FAILED,
+ };
+ self.metrics.on_imported(label, result.requesters.len());
}
}
-impl Stream for PendingImports {
- type Item = JfyiErrorResult<(PeerId, ImportStatementsResult)>;
- fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll