From b012597a7b18c0c82127d389a925822c48a348e2 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Tue, 4 Oct 2022 18:02:05 +0200 Subject: [PATCH] Batch vote import in dispute-distribution (#5894) * Start work on batching in dispute-distribution. * Guide work. * More guide changes. Still very much WIP. * Finish guide changes. * Clarification * Adjust argument about slashing. * WIP: Add constants to receiver. * Maintain order of disputes. * dispute-distribuion sender Rate limit. * Cleanup * WIP: dispute-distribution receiver. - [ ] Rate limiting - [ ] Batching * WIP: Batching. * fmt * Update `PeerQueues` to maintain more invariants. * WIP: Batching. * Small cleanup * Batching logic. * Some integration work. * Finish. Missing: Tests * Typo. * Docs. * Report missing metric. * Doc pass. * Tests for waiting_queue. * Speed up some crypto by 10x. * Fix redundant import. * Add some tracing. * Better sender rate limit * Some tests. * Tests * Add logging to rate limiter * Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md Co-authored-by: Tsvetomir Dimitrov * Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md Co-authored-by: Tsvetomir Dimitrov * Update node/network/dispute-distribution/src/receiver/mod.rs Co-authored-by: Tsvetomir Dimitrov * Review feedback. * Also log peer in log messages. * Fix indentation. * waker -> timer * Guide improvement. * Remove obsolete comment. * waker -> timer * Fix spell complaints. * Fix Cargo.lock Co-authored-by: Tsvetomir Dimitrov --- Cargo.lock | 1 + Cargo.toml | 7 +- node/network/dispute-distribution/Cargo.toml | 2 + node/network/dispute-distribution/src/lib.rs | 38 +- .../dispute-distribution/src/metrics.rs | 7 +- .../src/receiver/batches/batch.rs | 209 +++++++ .../src/receiver/batches/mod.rs | 170 ++++++ .../src/receiver/batches/waiting_queue.rs | 204 +++++++ .../src/receiver/error.rs | 25 +- .../dispute-distribution/src/receiver/mod.rs | 510 +++++++++++------- .../src/receiver/peer_queues.rs | 141 +++++ .../dispute-distribution/src/sender/mod.rs | 110 +++- .../src/sender/send_task.rs | 30 +- .../dispute-distribution/src/tests/mock.rs | 28 +- .../dispute-distribution/src/tests/mod.rs | 453 ++++++++++++---- .../protocol/src/request_response/mod.rs | 8 +- .../src/node/disputes/dispute-distribution.md | 309 ++++++----- 17 files changed, 1783 insertions(+), 469 deletions(-) create mode 100644 node/network/dispute-distribution/src/receiver/batches/batch.rs create mode 100644 node/network/dispute-distribution/src/receiver/batches/mod.rs create mode 100644 node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs create mode 100644 node/network/dispute-distribution/src/receiver/peer_queues.rs diff --git a/Cargo.lock b/Cargo.lock index 6e86aa901131..35cfaf7e9228 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6334,6 +6334,7 @@ dependencies = [ "fatality", "futures", "futures-timer", + "indexmap", "lazy_static", "lru 0.8.0", "parity-scale-codec", diff --git a/Cargo.toml b/Cargo.toml index ee886dafdc8a..c7edd0621319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,9 +125,9 @@ maintenance = { status = "actively-developed" } # # This list is ordered alphabetically. [profile.dev.package] -blake2b_simd = { opt-level = 3 } blake2 = { opt-level = 3 } blake2-rfc = { opt-level = 3 } +blake2b_simd = { opt-level = 3 } chacha20poly1305 = { opt-level = 3 } cranelift-codegen = { opt-level = 3 } cranelift-wasm = { opt-level = 3 } @@ -138,8 +138,8 @@ curve25519-dalek = { opt-level = 3 } ed25519-dalek = { opt-level = 3 } flate2 = { opt-level = 3 } futures-channel = { opt-level = 3 } -hashbrown = { opt-level = 3 } hash-db = { opt-level = 3 } +hashbrown = { opt-level = 3 } hmac = { opt-level = 3 } httparse = { opt-level = 3 } integer-sqrt = { opt-level = 3 } @@ -151,8 +151,8 @@ libz-sys = { opt-level = 3 } mio = { opt-level = 3 } nalgebra = { opt-level = 3 } num-bigint = { opt-level = 3 } -parking_lot_core = { opt-level = 3 } parking_lot = { opt-level = 3 } +parking_lot_core = { opt-level = 3 } percent-encoding = { opt-level = 3 } primitive-types = { opt-level = 3 } reed-solomon-novelpoly = { opt-level = 3 } @@ -162,6 +162,7 @@ sha2 = { opt-level = 3 } sha3 = { opt-level = 3 } smallvec = { opt-level = 3 } snow = { opt-level = 3 } +substrate-bip39 = {opt-level = 3} twox-hash = { opt-level = 3 } uint = { opt-level = 3 } wasmi = { opt-level = 3 } diff --git a/node/network/dispute-distribution/Cargo.toml b/node/network/dispute-distribution/Cargo.toml index c37f26f7bece..a731175f0521 100644 --- a/node/network/dispute-distribution/Cargo.toml +++ b/node/network/dispute-distribution/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] futures = "0.3.21" +futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } derive_more = "0.99.17" parity-scale-codec = { version = "3.1.5", features = ["std"] } @@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste thiserror = "1.0.31" fatality = "0.0.6" lru = "0.8.0" +indexmap = "1.9.1" [dev-dependencies] async-trait = "0.1.57" diff --git a/node/network/dispute-distribution/src/lib.rs b/node/network/dispute-distribution/src/lib.rs index 440b70b786d5..f109d5e6a40e 100644 --- a/node/network/dispute-distribution/src/lib.rs +++ b/node/network/dispute-distribution/src/lib.rs @@ -24,7 +24,7 @@ //! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles //! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`]. -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, time::Duration}; use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt}; @@ -66,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish}; /// via a dedicated channel and forwarding them to the dispute coordinator via /// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted /// nodes, the reality is not that simple of course. Before importing statements the receiver will -/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does -/// the following: +/// batch up imports as well as possible for efficient imports while maintaining timely dispute +/// resolution and handling of spamming validators: /// /// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`] /// service. -/// - Drop messages from a node, if we are already importing a message from that node (flood). -/// - Drop messages from nodes, that provided us messages where the statement import failed. +/// - Drop messages from a node, if it sends at a too high rate. +/// - Filter out duplicate messages (over some period of time). /// - Drop any obviously invalid votes (invalid signatures for example). /// - Ban peers whose votes were deemed invalid. /// +/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to +/// do, while at the same time making it aware of new disputes as fast as possible. +/// /// For successfully imported votes, we will confirm the receipt of the message back to the sender. /// This way a received confirmation guarantees, that the vote has been stored to disk by the /// receiver. @@ -95,6 +98,20 @@ pub use metrics::Metrics; const LOG_TARGET: &'static str = "parachain::dispute-distribution"; +/// Rate limit on the `receiver` side. +/// +/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we +/// start dropping messages from that peer to enforce that limit. +pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100); + +/// Rate limit on the `sender` side. +/// +/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as +/// well. +/// +/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`. +pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50)); + /// The dispute distribution subsystem. pub struct DisputeDistributionSubsystem { /// Easy and efficient runtime access for this subsystem. @@ -175,6 +192,12 @@ where ctx.spawn("disputes-receiver", receiver.run().boxed()) .map_err(FatalError::SpawnTask)?; + // Process messages for sending side. + // + // Note: We want the sender to be rate limited and we are currently taking advantage of the + // fact that the root task of this subsystem is only concerned with sending: Functions of + // `DisputeSender` might back pressure if the rate limit is hit, which will slow down this + // loop. If this fact ever changes, we will likely need another task. loop { let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await; match message { @@ -250,9 +273,10 @@ impl MuxedMessage { // ends. let from_overseer = ctx.recv().fuse(); futures::pin_mut!(from_overseer, from_sender); - futures::select!( - msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), + // We select biased to make sure we finish up loose ends, before starting new work. + futures::select_biased!( msg = from_sender.next() => MuxedMessage::Sender(msg), + msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), ) } } diff --git a/node/network/dispute-distribution/src/metrics.rs b/node/network/dispute-distribution/src/metrics.rs index 3f717bd105c3..aa2feeaad3a0 100644 --- a/node/network/dispute-distribution/src/metrics.rs +++ b/node/network/dispute-distribution/src/metrics.rs @@ -72,9 +72,12 @@ impl Metrics { } /// Statements have been imported. - pub fn on_imported(&self, label: &'static str) { + pub fn on_imported(&self, label: &'static str, num_requests: usize) { if let Some(metrics) = &self.0 { - metrics.imported_requests.with_label_values(&[label]).inc() + metrics + .imported_requests + .with_label_values(&[label]) + .inc_by(num_requests as u64) } } diff --git a/node/network/dispute-distribution/src/receiver/batches/batch.rs b/node/network/dispute-distribution/src/receiver/batches/batch.rs new file mode 100644 index 000000000000..eebed25ed790 --- /dev/null +++ b/node/network/dispute-distribution/src/receiver/batches/batch.rs @@ -0,0 +1,209 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{collections::HashMap, time::Instant}; + +use gum::CandidateHash; +use polkadot_node_network_protocol::{ + request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest}, + PeerId, +}; +use polkadot_node_primitives::SignedDisputeStatement; +use polkadot_primitives::v2::{CandidateReceipt, ValidatorIndex}; + +use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES}; + +use super::MAX_BATCH_LIFETIME; + +/// A batch of votes to be imported into the `dispute-coordinator`. +/// +/// Vote imports are way more efficient when performed in batches, hence we batch together incoming +/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute +/// coordinator. +/// +/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update +/// it will "flush" in case the incoming rate dropped too low, preparing the import. +pub struct Batch { + /// The actual candidate this batch is concerned with. + candidate_receipt: CandidateReceipt, + + /// Cache of `CandidateHash` (candidate_receipt.hash()). + candidate_hash: CandidateHash, + + /// All valid votes received in this batch so far. + /// + /// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates, + /// while still allowing validators to equivocate. + /// + /// Detecting and rejecting duplicates is crucial in order to effectively enforce + /// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates + /// here, the mechanism would be broken. + valid_votes: HashMap, + + /// All invalid votes received in this batch so far. + invalid_votes: HashMap, + + /// How many votes have been batched since the last tick/creation. + votes_batched_since_last_tick: u32, + + /// Expiry time for the batch. + /// + /// By this time the latest this batch will get flushed. + best_before: Instant, + + /// Requesters waiting for a response. + requesters: Vec<(PeerId, OutgoingResponseSender)>, +} + +/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`. +pub(super) enum TickResult { + /// Batch is still alive, please call `tick` again at the given `Instant`. + Alive(Batch, Instant), + /// Batch is done, ready for import! + Done(PreparedImport), +} + +/// Ready for import. +pub struct PreparedImport { + pub candidate_receipt: CandidateReceipt, + pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, + /// Information about original requesters. + pub requesters: Vec<(PeerId, OutgoingResponseSender)>, +} + +impl From for PreparedImport { + fn from(batch: Batch) -> Self { + let Batch { + candidate_receipt, + valid_votes, + invalid_votes, + requesters: pending_responses, + .. + } = batch; + + let statements = valid_votes + .into_iter() + .chain(invalid_votes.into_iter()) + .map(|(index, statement)| (statement, index)) + .collect(); + + Self { candidate_receipt, statements, requesters: pending_responses } + } +} + +impl Batch { + /// Create a new empty batch based on the given `CandidateReceipt`. + /// + /// To create a `Batch` use Batches::find_batch`. + /// + /// Arguments: + /// + /// * `candidate_receipt` - The candidate this batch is meant to track votes for. + /// * `now` - current time stamp for calculating the first tick. + /// + /// Returns: A batch and the first `Instant` you are supposed to call `tick`. + pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) { + let s = Self { + candidate_hash: candidate_receipt.hash(), + candidate_receipt, + valid_votes: HashMap::new(), + invalid_votes: HashMap::new(), + votes_batched_since_last_tick: 0, + best_before: Instant::now() + MAX_BATCH_LIFETIME, + requesters: Vec::new(), + }; + let next_tick = s.calculate_next_tick(now); + (s, next_tick) + } + + /// Receipt of the candidate this batch is batching votes for. + pub fn candidate_receipt(&self) -> &CandidateReceipt { + &self.candidate_receipt + } + + /// Add votes from a validator into the batch. + /// + /// The statements are supposed to be the valid and invalid statements received in a + /// `DisputeRequest`. + /// + /// The given `pending_response` is the corresponding response sender for responding to `peer`. + /// If at least one of the votes is new as far as this batch is concerned we record the + /// pending_response, for later use. In case both votes are known already, we return the + /// response sender as an `Err` value. + pub fn add_votes( + &mut self, + valid_vote: (SignedDisputeStatement, ValidatorIndex), + invalid_vote: (SignedDisputeStatement, ValidatorIndex), + peer: PeerId, + pending_response: OutgoingResponseSender, + ) -> Result<(), OutgoingResponseSender> { + debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash()); + debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash); + + let mut duplicate = true; + + if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() { + self.votes_batched_since_last_tick += 1; + duplicate = false; + } + if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() { + self.votes_batched_since_last_tick += 1; + duplicate = false; + } + + if duplicate { + Err(pending_response) + } else { + self.requesters.push((peer, pending_response)); + Ok(()) + } + } + + /// Check batch for liveness. + /// + /// This function is supposed to be called at instants given at construction and as returned as + /// part of `TickResult`. + pub(super) fn tick(mut self, now: Instant) -> TickResult { + if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES && + now < self.best_before + { + // Still good: + let next_tick = self.calculate_next_tick(now); + // Reset counter: + self.votes_batched_since_last_tick = 0; + TickResult::Alive(self, next_tick) + } else { + TickResult::Done(PreparedImport::from(self)) + } + } + + /// Calculate when the next tick should happen. + /// + /// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this batch + /// would exceed `MAX_BATCH_LIFETIME`. + /// + /// # Arguments + /// + /// * `now` - The current time. + fn calculate_next_tick(&self, now: Instant) -> Instant { + let next_tick = now + BATCH_COLLECTING_INTERVAL; + if next_tick < self.best_before { + next_tick + } else { + self.best_before + } + } +} diff --git a/node/network/dispute-distribution/src/receiver/batches/mod.rs b/node/network/dispute-distribution/src/receiver/batches/mod.rs new file mode 100644 index 000000000000..b343b55e0b04 --- /dev/null +++ b/node/network/dispute-distribution/src/receiver/batches/mod.rs @@ -0,0 +1,170 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{ + collections::{hash_map, HashMap}, + time::{Duration, Instant}, +}; + +use futures::future::pending; + +use polkadot_node_network_protocol::request_response::DISPUTE_REQUEST_TIMEOUT; +use polkadot_primitives::v2::{CandidateHash, CandidateReceipt}; + +use crate::{ + receiver::batches::{batch::TickResult, waiting_queue::PendingWake}, + LOG_TARGET, +}; + +pub use self::batch::{Batch, PreparedImport}; +use self::waiting_queue::WaitingQueue; + +use super::{ + error::{JfyiError, JfyiResult}, + BATCH_COLLECTING_INTERVAL, +}; + +/// A single batch (per candidate) as managed by `Batches`. +mod batch; + +/// Queue events in time and wait for them to become ready. +mod waiting_queue; + +/// Safe-guard in case votes trickle in real slow. +/// +/// If the batch life time exceeded the time the sender is willing to wait for a confirmation, we +/// would trigger pointless re-sends. +const MAX_BATCH_LIFETIME: Duration = DISPUTE_REQUEST_TIMEOUT.saturating_sub(Duration::from_secs(2)); + +/// Limit the number of batches that can be alive at any given time. +/// +/// Reasoning for this number, see guide. +pub const MAX_BATCHES: usize = 1000; + +/// Manage batches. +/// +/// - Batches can be found via `find_batch()` in order to add votes to them/check they exist. +/// - Batches can be checked for being ready for flushing in order to import contained votes. +pub struct Batches { + /// The batches we manage. + /// + /// Kept invariants: + /// For each entry in `batches`, there exists an entry in `waiting_queue` as well - we wait on + /// all batches! + batches: HashMap, + /// Waiting queue for waiting for batches to become ready for `tick`. + /// + /// Kept invariants by `Batches`: + /// For each entry in the `waiting_queue` there exists a corresponding entry in `batches`. + waiting_queue: WaitingQueue, +} + +/// A found batch is either really found or got created so it can be found. +pub enum FoundBatch<'a> { + /// Batch just got created. + Created(&'a mut Batch), + /// Batch already existed. + Found(&'a mut Batch), +} + +impl Batches { + /// Create new empty `Batches`. + pub fn new() -> Self { + debug_assert!( + MAX_BATCH_LIFETIME > BATCH_COLLECTING_INTERVAL, + "Unexpectedly low `MAX_BATCH_LIFETIME`, please check parameters." + ); + Self { batches: HashMap::new(), waiting_queue: WaitingQueue::new() } + } + + /// Find a particular batch. + /// + /// That is either find it, or we create it as reflected by the result `FoundBatch`. + pub fn find_batch( + &mut self, + candidate_hash: CandidateHash, + candidate_receipt: CandidateReceipt, + ) -> JfyiResult { + if self.batches.len() >= MAX_BATCHES { + return Err(JfyiError::MaxBatchLimitReached) + } + debug_assert!(candidate_hash == candidate_receipt.hash()); + let result = match self.batches.entry(candidate_hash) { + hash_map::Entry::Vacant(vacant) => { + let now = Instant::now(); + let (created, ready_at) = Batch::new(candidate_receipt, now); + let pending_wake = PendingWake { payload: candidate_hash, ready_at }; + self.waiting_queue.push(pending_wake); + FoundBatch::Created(vacant.insert(created)) + }, + hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.into_mut()), + }; + Ok(result) + } + + /// Wait for the next `tick` to check for ready batches. + /// + /// This function blocks (returns `Poll::Pending`) until at least one batch can be + /// checked for readiness meaning that `BATCH_COLLECTING_INTERVAL` has passed since the last + /// check for that batch or it reached end of life. + /// + /// If this `Batches` instance is empty (does not actually contain any batches), then this + /// function will always return `Poll::Pending`. + /// + /// Returns: A `Vec` of all `PreparedImport`s from batches that became ready. + pub async fn check_batches(&mut self) -> Vec { + let now = Instant::now(); + + let mut imports = Vec::new(); + + // Wait for at least one batch to become ready: + self.waiting_queue.wait_ready(now).await; + + // Process all ready entries: + while let Some(wake) = self.waiting_queue.pop_ready(now) { + let batch = self.batches.remove(&wake.payload); + debug_assert!( + batch.is_some(), + "Entries referenced in `waiting_queue` are supposed to exist!" + ); + let batch = match batch { + None => return pending().await, + Some(batch) => batch, + }; + match batch.tick(now) { + TickResult::Done(import) => { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?wake.payload, + "Batch became ready." + ); + imports.push(import); + }, + TickResult::Alive(old_batch, next_tick) => { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?wake.payload, + "Batch found to be still alive on check." + ); + let pending_wake = PendingWake { payload: wake.payload, ready_at: next_tick }; + self.waiting_queue.push(pending_wake); + self.batches.insert(wake.payload, old_batch); + }, + } + } + imports + } +} diff --git a/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs b/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs new file mode 100644 index 000000000000..995dc74d358f --- /dev/null +++ b/node/network/dispute-distribution/src/receiver/batches/waiting_queue.rs @@ -0,0 +1,204 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{cmp::Ordering, collections::BinaryHeap, time::Instant}; + +use futures::future::pending; +use futures_timer::Delay; + +/// Wait asynchronously for given `Instant`s one after the other. +/// +/// `PendingWake`s can be inserted and `WaitingQueue` makes `wait_ready()` to always wait for the +/// next `Instant` in the queue. +pub struct WaitingQueue { + /// All pending wakes we are supposed to wait on in order. + pending_wakes: BinaryHeap>, + /// Wait for next `PendingWake`. + timer: Option, +} + +/// Represents some event waiting to be processed at `ready_at`. +/// +/// This is an event in `WaitingQueue`. It provides an `Ord` instance, that sorts descending with +/// regard to `Instant` (so we get a `min-heap` with the earliest `Instant` at the top). +#[derive(Eq, PartialEq)] +pub struct PendingWake { + pub payload: Payload, + pub ready_at: Instant, +} + +impl WaitingQueue { + /// Get a new empty `WaitingQueue`. + /// + /// If you call `pop` on this queue immediately, it will always return `Poll::Pending`. + pub fn new() -> Self { + Self { pending_wakes: BinaryHeap::new(), timer: None } + } + + /// Push a `PendingWake`. + /// + /// The next call to `wait_ready` will make sure to wake soon enough to process that new event in a + /// timely manner. + pub fn push(&mut self, wake: PendingWake) { + self.pending_wakes.push(wake); + // Reset timer as it is potentially obsolete now: + self.timer = None; + } + + /// Pop the next ready item. + /// + /// This function does not wait, if nothing is ready right now as determined by the passed + /// `now` time stamp, this function simply returns `None`. + pub fn pop_ready(&mut self, now: Instant) -> Option> { + let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now); + if is_ready { + Some(self.pending_wakes.pop().expect("We just peeked. qed.")) + } else { + None + } + } + + /// Don't pop, just wait until something is ready. + /// + /// Once this function returns `Poll::Ready(())` `pop_ready()` will return `Some`, if passed + /// the same `Instant`. + /// + /// Whether ready or not is determined based on the passed time stamp `now` which should be the + /// current time as returned by `Instant::now()` + /// + /// This function waits asynchronously for an item to become ready. If there is no more item, + /// this call will wait forever (return Poll::Pending without scheduling a wake). + pub async fn wait_ready(&mut self, now: Instant) { + if let Some(timer) = &mut self.timer { + // Previous timer was not done yet. + timer.await + } + + let next_waiting = self.pending_wakes.peek(); + let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now); + if is_ready { + return + } + + self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now))); + match &mut self.timer { + None => return pending().await, + Some(timer) => timer.await, + } + } +} + +impl PartialOrd> for PendingWake { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PendingWake { + fn cmp(&self, other: &Self) -> Ordering { + // Reverse order for min-heap: + match other.ready_at.cmp(&self.ready_at) { + Ordering::Equal => other.payload.cmp(&self.payload), + o => o, + } + } +} +#[cfg(test)] +mod tests { + use std::{ + task::Poll, + time::{Duration, Instant}, + }; + + use assert_matches::assert_matches; + use futures::{future::poll_fn, pin_mut, Future}; + + use crate::LOG_TARGET; + + use super::{PendingWake, WaitingQueue}; + + #[test] + fn wait_ready_waits_for_earliest_event_always() { + sp_tracing::try_init_simple(); + let mut queue = WaitingQueue::new(); + let now = Instant::now(); + let start = now; + queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) }); + // Push another one in order: + queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) }); + // Push one out of order: + queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) }); + // Push another one at same timestamp (should become ready at the same time) + queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) }); + + futures::executor::block_on(async move { + // No time passed yet - nothing should be ready. + assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready"); + + // Receive them in order at expected times: + queue.wait_ready(now).await; + gum::trace!(target: LOG_TARGET, "After first wait."); + + let now = start + Duration::from_millis(1); + assert!(Instant::now() - start >= Duration::from_millis(1)); + assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32)); + // One more should be ready: + assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32)); + assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready."); + + queue.wait_ready(now).await; + gum::trace!(target: LOG_TARGET, "After second wait."); + let now = start + Duration::from_millis(3); + assert!(Instant::now() - start >= Duration::from_millis(3)); + assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32)); + assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready."); + + // Push in between wait: + poll_fn(|cx| { + let fut = queue.wait_ready(now); + pin_mut!(fut); + assert_matches!(fut.poll(cx), Poll::Pending); + Poll::Ready(()) + }) + .await; + queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) }); + + queue.wait_ready(now).await; + // Newly pushed element should have become ready: + gum::trace!(target: LOG_TARGET, "After third wait."); + let now = start + Duration::from_millis(4); + assert!(Instant::now() - start >= Duration::from_millis(4)); + assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32)); + assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready."); + + queue.wait_ready(now).await; + gum::trace!(target: LOG_TARGET, "After fourth wait."); + let now = start + Duration::from_millis(5); + assert!(Instant::now() - start >= Duration::from_millis(5)); + assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32)); + assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready."); + + // queue empty - should wait forever now: + poll_fn(|cx| { + let fut = queue.wait_ready(now); + pin_mut!(fut); + assert_matches!(fut.poll(cx), Poll::Pending); + Poll::Ready(()) + }) + .await; + }); + } +} diff --git a/node/network/dispute-distribution/src/receiver/error.rs b/node/network/dispute-distribution/src/receiver/error.rs index ce578cc8e0f9..4477335440d0 100644 --- a/node/network/dispute-distribution/src/receiver/error.rs +++ b/node/network/dispute-distribution/src/receiver/error.rs @@ -19,8 +19,10 @@ use fatality::Nested; +use gum::CandidateHash; use polkadot_node_network_protocol::{request_response::incoming, PeerId}; use polkadot_node_subsystem_util::runtime; +use polkadot_primitives::v2::AuthorityDiscoveryId; use crate::LOG_TARGET; @@ -35,8 +37,8 @@ pub enum Error { #[error("Retrieving next incoming request failed.")] IncomingRequest(#[from] incoming::Error), - #[error("Sending back response to peer {0} failed.")] - SendResponse(PeerId), + #[error("Sending back response to peers {0:#?} failed.")] + SendResponses(Vec), #[error("Changing peer's ({0}) reputation failed.")] SetPeerReputation(PeerId), @@ -44,16 +46,29 @@ pub enum Error { #[error("Dispute request with invalid signatures, from peer {0}.")] InvalidSignature(PeerId), - #[error("Import of dispute got canceled for peer {0} - import failed for some reason.")] - ImportCanceled(PeerId), + #[error("Received votes from peer {0} have been completely redundant.")] + RedundantMessage(PeerId), + + #[error("Import of dispute got canceled for candidate {0} - import failed for some reason.")] + ImportCanceled(CandidateHash), #[error("Peer {0} attempted to participate in dispute and is not a validator.")] NotAValidator(PeerId), + + #[error("Force flush for batch that could not be found attempted, candidate hash: {0}")] + ForceFlushBatchDoesNotExist(CandidateHash), + + // Should never happen in practice: + #[error("We needed to drop messages, because we reached limit on concurrent batches.")] + MaxBatchLimitReached, + + #[error("Authority {0} sent messages at a too high rate.")] + AuthorityFlooding(AuthorityDiscoveryId), } pub type Result = std::result::Result; -pub type JfyiErrorResult = std::result::Result; +pub type JfyiResult = std::result::Result; /// Utility for eating top level errors and log them. /// diff --git a/node/network/dispute-distribution/src/receiver/mod.rs b/node/network/dispute-distribution/src/receiver/mod.rs index c38ca2133f54..158c66e20655 100644 --- a/node/network/dispute-distribution/src/receiver/mod.rs +++ b/node/network/dispute-distribution/src/receiver/mod.rs @@ -15,21 +15,21 @@ // along with Polkadot. If not, see . use std::{ - collections::HashSet, num::NonZeroUsize, pin::Pin, task::{Context, Poll}, + time::Duration, }; use futures::{ channel::oneshot, - future::{poll_fn, BoxFuture}, + future::poll_fn, pin_mut, - stream::{FusedStream, FuturesUnordered, StreamExt}, - Future, FutureExt, Stream, + stream::{FuturesUnordered, StreamExt}, + Future, }; -use lru::LruCache; +use gum::CandidateHash; use polkadot_node_network_protocol::{ authority_discovery::AuthorityDiscovery, request_response::{ @@ -52,25 +52,47 @@ use crate::{ }; mod error; -use self::error::{log_error, JfyiError, JfyiErrorResult, Result}; + +/// Rate limiting queues for incoming requests by peers. +mod peer_queues; + +/// Batch imports together. +mod batches; + +use self::{ + batches::{Batches, FoundBatch, PreparedImport}, + error::{log_error, JfyiError, JfyiResult, Result}, + peer_queues::PeerQueues, +}; const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded."); const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid."); -const COST_INVALID_CANDIDATE: Rep = Rep::Malicious("Reported candidate was not available."); +const COST_INVALID_IMPORT: Rep = + Rep::Malicious("Import was deemed invalid by dispute-coordinator."); const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator."); +/// Mildly punish peers exceeding their rate limit. +/// +/// For honest peers this should rarely happen, but if it happens we would not want to disconnect +/// too quickly. Minor cost should suffice for disconnecting any real flooder. +const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Peer exceeded the rate limit."); -/// How many statement imports we want to issue in parallel: -pub const MAX_PARALLEL_IMPORTS: usize = 10; +/// How many votes must have arrived in the last `BATCH_COLLECTING_INTERVAL` +/// +/// in order for a batch to stay alive and not get flushed/imported to the dispute-coordinator. +/// +/// This ensures a timely import of batches. +#[cfg(not(test))] +pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10; +#[cfg(test)] +pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2; -const BANNED_PEERS_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(MAX_PARALLEL_IMPORTS) { - Some(cap) => cap, - None => panic!("Banned peers cache size should not be 0."), -}; +/// Time we allow to pass for new votes to trickle in. +/// +/// See `MIN_KEEP_BATCH_ALIVE_VOTES` above. +/// Should be greater or equal to `RECEIVE_RATE_LIMIT` (there is no point in checking any faster). +pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500); /// State for handling incoming `DisputeRequest` messages. -/// -/// This is supposed to run as its own task in order to easily impose back pressure on the incoming -/// request channel and at the same time to drop flood messages as fast as possible. pub struct DisputesReceiver { /// Access to session information. runtime: RuntimeInfo, @@ -81,18 +103,17 @@ pub struct DisputesReceiver { /// Channel to retrieve incoming requests from. receiver: IncomingRequestReceiver, + /// Rate limiting queue for each peer (only authorities). + peer_queues: PeerQueues, + + /// Currently active batches of imports per candidate. + batches: Batches, + /// Authority discovery service: authority_discovery: AD, - /// Imports currently being processed. - pending_imports: PendingImports, - - /// We keep record of the last banned peers. - /// - /// This is needed because once we ban a peer, we will very likely still have pending requests - /// in the incoming channel - we should not waste time recovering availability for those, as we - /// already know the peer is malicious. - banned_peers: LruCache, + /// Imports currently being processed by the `dispute-coordinator`. + pending_imports: FuturesUnordered, /// Log received requests. metrics: Metrics, @@ -106,36 +127,24 @@ enum MuxedMessage { /// /// - We need to make sure responses are actually sent (therefore we need to await futures /// promptly). - /// - We need to update `banned_peers` accordingly to the result. - ConfirmedImport(JfyiErrorResult<(PeerId, ImportStatementsResult)>), + /// - We need to punish peers whose import got rejected. + ConfirmedImport(ImportResult), /// A new request has arrived and should be handled. NewRequest(IncomingRequest), -} -impl MuxedMessage { - async fn receive( - pending_imports: &mut PendingImports, - pending_requests: &mut IncomingRequestReceiver, - ) -> Result { - poll_fn(|ctx| { - let next_req = pending_requests.recv(|| vec![COST_INVALID_REQUEST]); - pin_mut!(next_req); - if let Poll::Ready(r) = next_req.poll(ctx) { - return match r { - Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())), - Ok(v) => Poll::Ready(Ok(Self::NewRequest(v))), - } - } - // In case of Ready(None) return `Pending` below - we want to wait for the next request - // in that case. - if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) { - return Poll::Ready(Ok(Self::ConfirmedImport(v))) - } - Poll::Pending - }) - .await - } + /// Rate limit timer hit - is is time to process one row of messages. + /// + /// This is the result of calling `self.peer_queues.pop_reqs()`. + WakePeerQueuesPopReqs(Vec>), + + /// It is time to check batches. + /// + /// Every `BATCH_COLLECTING_INTERVAL` we check whether less than `MIN_KEEP_BATCH_ALIVE_VOTES` + /// new votes arrived, if so the batch is ready for import. + /// + /// This is the result of calling `self.batches.check_batches()`. + WakeCheckBatches(Vec), } impl DisputesReceiver @@ -159,11 +168,10 @@ where runtime, sender, receiver, + peer_queues: PeerQueues::new(), + batches: Batches::new(), authority_discovery, - pending_imports: PendingImports::new(), - // Size of MAX_PARALLEL_IMPORTS ensures we are going to immediately get rid of any - // malicious requests still pending in the incoming queue. - banned_peers: LruCache::new(BANNED_PEERS_CACHE_SIZE), + pending_imports: FuturesUnordered::new(), metrics, } } @@ -187,60 +195,132 @@ where } } - /// Actual work happening here. + /// Actual work happening here in three phases: + /// + /// 1. Receive and queue incoming messages until the rate limit timer hits. + /// 2. Do import/batching for the head of all queues. + /// 3. Check and flush any ready batches. async fn run_inner(&mut self) -> Result<()> { - let msg = MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).await?; + let msg = self.receive_message().await?; - let incoming = match msg { - // We need to clean up futures, to make sure responses are sent: - MuxedMessage::ConfirmedImport(m_bad) => { - self.ban_bad_peer(m_bad)?; - return Ok(()) + match msg { + MuxedMessage::NewRequest(req) => { + // Phase 1: + self.metrics.on_received_request(); + self.dispatch_to_queues(req).await?; }, - MuxedMessage::NewRequest(req) => req, - }; + MuxedMessage::WakePeerQueuesPopReqs(reqs) => { + // Phase 2: + for req in reqs { + // No early return - we cannot cancel imports of one peer, because the import of + // another failed: + match log_error(self.start_import_or_batch(req).await) { + Ok(()) => {}, + Err(fatal) => return Err(fatal.into()), + } + } + }, + MuxedMessage::WakeCheckBatches(ready_imports) => { + // Phase 3: + self.import_ready_batches(ready_imports).await; + }, + MuxedMessage::ConfirmedImport(import_result) => { + self.update_imported_requests_metrics(&import_result); + // Confirm imports to requesters/punish them on invalid imports: + send_responses_to_requesters(import_result).await?; + }, + } + + Ok(()) + } + + /// Receive one `MuxedMessage`. + /// + /// + /// Dispatching events to messages as they happen. + async fn receive_message(&mut self) -> Result { + poll_fn(|ctx| { + // In case of Ready(None), we want to wait for pending requests: + if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) { + return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?))) + } + + let rate_limited = self.peer_queues.pop_reqs(); + pin_mut!(rate_limited); + // We poll rate_limit before batches, so we don't unnecessarily delay importing to + // batches. + if let Poll::Ready(reqs) = rate_limited.poll(ctx) { + return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs))) + } - self.metrics.on_received_request(); + let ready_batches = self.batches.check_batches(); + pin_mut!(ready_batches); + if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) { + return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches))) + } - let peer = incoming.peer; + let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]); + pin_mut!(next_req); + if let Poll::Ready(r) = next_req.poll(ctx) { + return match r { + Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())), + Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))), + } + } + Poll::Pending + }) + .await + } - // Only accept messages from validators: - if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() { - incoming - .send_outgoing_response(OutgoingResponse { + /// Process incoming requests. + /// + /// - Check sender is authority + /// - Dispatch message to corresponding queue in `peer_queues`. + /// - If queue is full, drop message and change reputation of sender. + async fn dispatch_to_queues(&mut self, req: IncomingRequest) -> JfyiResult<()> { + let peer = req.peer; + // Only accept messages from validators, in case there are multiple `AuthorityId`s, we + // just take the first one. On session boundaries this might allow validators to double + // their rate limit for a short period of time, which seems acceptable. + let authority_id = match self + .authority_discovery + .get_authority_ids_by_peer_id(peer) + .await + .and_then(|s| s.into_iter().next()) + { + None => { + req.send_outgoing_response(OutgoingResponse { result: Err(()), reputation_changes: vec![COST_NOT_A_VALIDATOR], sent_feedback: None, }) - .map_err(|_| JfyiError::SendResponse(peer))?; - - return Err(JfyiError::NotAValidator(peer).into()) - } - - // Immediately drop requests from peers that already have requests in flight or have - // been banned recently (flood protection): - if self.pending_imports.peer_is_pending(&peer) || self.banned_peers.contains(&peer) { - gum::trace!( - target: LOG_TARGET, - ?peer, - "Dropping message from peer (banned/pending import)" - ); - return Ok(()) - } + .map_err(|_| JfyiError::SendResponses(vec![peer]))?; + return Err(JfyiError::NotAValidator(peer).into()) + }, + Some(auth_id) => auth_id, + }; - // Wait for a free slot: - if self.pending_imports.len() >= MAX_PARALLEL_IMPORTS { - // Wait for one to finish: - let r = self.pending_imports.next().await; - self.ban_bad_peer(r.expect("pending_imports.len() is greater 0. qed."))?; + // Queue request: + if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) { + req.send_outgoing_response(OutgoingResponse { + result: Err(()), + reputation_changes: vec![COST_APPARENT_FLOOD], + sent_feedback: None, + }) + .map_err(|_| JfyiError::SendResponses(vec![peer]))?; + return Err(JfyiError::AuthorityFlooding(authority_id)) } - - // All good - initiate import. - self.start_import(incoming).await + Ok(()) } - /// Start importing votes for the given request. - async fn start_import(&mut self, incoming: IncomingRequest) -> Result<()> { + /// Start importing votes for the given request or batch. + /// + /// Signature check and in case we already have an existing batch we import to that batch, + /// otherwise import to `dispute-coordinator` directly and open a batch. + async fn start_import_or_batch( + &mut self, + incoming: IncomingRequest, + ) -> Result<()> { let IncomingRequest { peer, payload, pending_response } = incoming; let info = self @@ -270,128 +350,172 @@ where Ok(votes) => votes, }; - let (pending_confirmation, confirmation_rx) = oneshot::channel(); - self.sender - .send_message(DisputeCoordinatorMessage::ImportStatements { - candidate_receipt, - session: valid_vote.0.session_index(), - statements: vec![valid_vote, invalid_vote], - pending_confirmation: Some(pending_confirmation), - }) - .await; + let candidate_hash = *valid_vote.0.candidate_hash(); + + match self.batches.find_batch(candidate_hash, candidate_receipt)? { + FoundBatch::Created(batch) => { + // There was no entry yet - start import immediately: + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + ?peer, + "No batch yet - triggering immediate import" + ); + let import = PreparedImport { + candidate_receipt: batch.candidate_receipt().clone(), + statements: vec![valid_vote, invalid_vote], + requesters: vec![(peer, pending_response)], + }; + self.start_import(import).await; + }, + FoundBatch::Found(batch) => { + gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request"); + let batch_result = + batch.add_votes(valid_vote, invalid_vote, peer, pending_response); + + if let Err(pending_response) = batch_result { + // We don't expect honest peers to send redundant votes within a single batch, + // as the timeout for retry is much higher. Still we don't want to punish the + // node as it might not be the node's fault. Some other (malicious) node could have been + // faster sending the same votes in order to harm the reputation of that honest + // node. Given that we already have a rate limit, if a validator chooses to + // waste available rate with redundant votes - so be it. The actual dispute + // resolution is unaffected. + gum::debug!( + target: LOG_TARGET, + ?peer, + "Peer sent completely redundant votes within a single batch - that looks fishy!", + ); + pending_response + .send_outgoing_response(OutgoingResponse { + // While we have seen duplicate votes, we cannot confirm as we don't + // know yet whether the batch is going to be confirmed, so we assume + // the worst. We don't want to push the pending response to the batch + // either as that would be unbounded, only limited by the rate limit. + result: Err(()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .map_err(|_| JfyiError::SendResponses(vec![peer]))?; + return Err(From::from(JfyiError::RedundantMessage(peer))) + } + }, + } - self.pending_imports.push(peer, confirmation_rx, pending_response); Ok(()) } - /// Await an import and ban any misbehaving peers. - /// - /// In addition we report import metrics. - fn ban_bad_peer( - &mut self, - result: JfyiErrorResult<(PeerId, ImportStatementsResult)>, - ) -> JfyiErrorResult<()> { - match result? { - (_, ImportStatementsResult::ValidImport) => { - self.metrics.on_imported(SUCCEEDED); - }, - (bad_peer, ImportStatementsResult::InvalidImport) => { - self.metrics.on_imported(FAILED); - self.banned_peers.put(bad_peer, ()); - }, + /// Trigger import into the dispute-coordinator of ready batches (`PreparedImport`s). + async fn import_ready_batches(&mut self, ready_imports: Vec) { + for import in ready_imports { + self.start_import(import).await; } - Ok(()) } -} -/// Manage pending imports in a way that preserves invariants. -struct PendingImports { - /// Futures in flight. - futures: - FuturesUnordered)>>, - /// Peers whose requests are currently in flight. - peers: HashSet, -} + /// Start import and add response receiver to `pending_imports`. + async fn start_import(&mut self, import: PreparedImport) { + let PreparedImport { candidate_receipt, statements, requesters } = import; + let (session_index, candidate_hash) = match statements.iter().next() { + None => { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?candidate_receipt.hash(), + "Not importing empty batch" + ); + return + }, + Some(vote) => (vote.0.session_index(), vote.0.candidate_hash().clone()), + }; -impl PendingImports { - pub fn new() -> Self { - Self { futures: FuturesUnordered::new(), peers: HashSet::new() } - } + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + self.sender + .send_message(DisputeCoordinatorMessage::ImportStatements { + candidate_receipt, + session: session_index, + statements, + pending_confirmation: Some(pending_confirmation), + }) + .await; - pub fn push( - &mut self, - peer: PeerId, - handled: oneshot::Receiver, - pending_response: OutgoingResponseSender, - ) { - self.peers.insert(peer); - self.futures.push( - async move { - let r = respond_to_request(peer, handled, pending_response).await; - (peer, r) - } - .boxed(), - ) - } + let pending = + PendingImport { candidate_hash, requesters, pending_response: confirmation_rx }; - /// Returns the number of contained futures. - pub fn len(&self) -> usize { - self.futures.len() + self.pending_imports.push(pending); } - /// Check whether a peer has a pending import. - pub fn peer_is_pending(&self, peer: &PeerId) -> bool { - self.peers.contains(peer) + fn update_imported_requests_metrics(&self, result: &ImportResult) { + let label = match result.result { + ImportStatementsResult::ValidImport => SUCCEEDED, + ImportStatementsResult::InvalidImport => FAILED, + }; + self.metrics.on_imported(label, result.requesters.len()); } } -impl Stream for PendingImports { - type Item = JfyiErrorResult<(PeerId, ImportStatementsResult)>; - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.futures).poll_next(ctx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some((peer, result))) => { - self.peers.remove(&peer); - Poll::Ready(Some(result.map(|r| (peer, r)))) - }, - } - } -} -impl FusedStream for PendingImports { - fn is_terminated(&self) -> bool { - self.futures.is_terminated() - } -} +async fn send_responses_to_requesters(import_result: ImportResult) -> JfyiResult<()> { + let ImportResult { requesters, result } = import_result; -// Future for `PendingImports` -// -// - Wait for import -// - Punish peer -// - Deliver result -async fn respond_to_request( - peer: PeerId, - handled: oneshot::Receiver, - pending_response: OutgoingResponseSender, -) -> JfyiErrorResult { - let result = handled.await.map_err(|_| JfyiError::ImportCanceled(peer))?; - - let response = match result { - ImportStatementsResult::ValidImport => OutgoingResponse { + let mk_response = match result { + ImportStatementsResult::ValidImport => || OutgoingResponse { result: Ok(DisputeResponse::Confirmed), reputation_changes: Vec::new(), sent_feedback: None, }, - ImportStatementsResult::InvalidImport => OutgoingResponse { + ImportStatementsResult::InvalidImport => || OutgoingResponse { result: Err(()), - reputation_changes: vec![COST_INVALID_CANDIDATE], + reputation_changes: vec![COST_INVALID_IMPORT], sent_feedback: None, }, }; - pending_response - .send_outgoing_response(response) - .map_err(|_| JfyiError::SendResponse(peer))?; + let mut sending_failed_for = Vec::new(); + for (peer, pending_response) in requesters { + if let Err(()) = pending_response.send_outgoing_response(mk_response()) { + sending_failed_for.push(peer); + } + } + + if !sending_failed_for.is_empty() { + Err(JfyiError::SendResponses(sending_failed_for)) + } else { + Ok(()) + } +} - Ok(result) +/// A future that resolves into an `ImportResult` when ready. +/// +/// This future is used on `dispute-coordinator` import messages for the oneshot response receiver +/// to: +/// - Keep track of concerned `CandidateHash` for reporting errors. +/// - Keep track of requesting peers so we can confirm the import/punish them on invalid imports. +struct PendingImport { + candidate_hash: CandidateHash, + requesters: Vec<(PeerId, OutgoingResponseSender)>, + pending_response: oneshot::Receiver, +} + +/// A `PendingImport` becomes an `ImportResult` once done. +struct ImportResult { + /// Requesters of that import. + requesters: Vec<(PeerId, OutgoingResponseSender)>, + /// Actual result of the import. + result: ImportStatementsResult, +} + +impl PendingImport { + async fn wait_for_result(&mut self) -> JfyiResult { + let result = (&mut self.pending_response) + .await + .map_err(|_| JfyiError::ImportCanceled(self.candidate_hash))?; + Ok(ImportResult { requesters: std::mem::take(&mut self.requesters), result }) + } +} + +impl Future for PendingImport { + type Output = JfyiResult; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let fut = self.wait_for_result(); + pin_mut!(fut); + fut.poll(cx) + } } diff --git a/node/network/dispute-distribution/src/receiver/peer_queues.rs b/node/network/dispute-distribution/src/receiver/peer_queues.rs new file mode 100644 index 000000000000..138b59c3f867 --- /dev/null +++ b/node/network/dispute-distribution/src/receiver/peer_queues.rs @@ -0,0 +1,141 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::collections::{hash_map::Entry, HashMap, VecDeque}; + +use futures::future::pending; +use futures_timer::Delay; +use polkadot_node_network_protocol::request_response::{v1::DisputeRequest, IncomingRequest}; +use polkadot_primitives::v2::AuthorityDiscoveryId; + +use crate::RECEIVE_RATE_LIMIT; + +/// How many messages we are willing to queue per peer (validator). +/// +/// The larger this value is, the larger bursts are allowed to be without us dropping messages. On +/// the flip side this gets allocated per validator, so for a size of 10 this will result +/// in `10_000 * size_of(IncomingRequest)` in the worst case. +/// +/// `PEER_QUEUE_CAPACITY` must not be 0 for obvious reasons. +#[cfg(not(test))] +pub const PEER_QUEUE_CAPACITY: usize = 10; +#[cfg(test)] +pub const PEER_QUEUE_CAPACITY: usize = 2; + +/// Queues for messages from authority peers for rate limiting. +/// +/// Invariants ensured: +/// +/// 1. No queue will ever have more than `PEER_QUEUE_CAPACITY` elements. +/// 2. There are no empty queues. Whenever a queue gets empty, it is removed. This way checking +/// whether there are any messages queued is cheap. +/// 3. As long as not empty, `pop_reqs` will, if called in sequence, not return `Ready` more often +/// than once for every `RECEIVE_RATE_LIMIT`, but it will always return Ready eventually. +/// 4. If empty `pop_reqs` will never return `Ready`, but will always be `Pending`. +pub struct PeerQueues { + /// Actual queues. + queues: HashMap>>, + + /// Delay timer for establishing the rate limit. + rate_limit_timer: Option, +} + +impl PeerQueues { + /// New empty `PeerQueues`. + pub fn new() -> Self { + Self { queues: HashMap::new(), rate_limit_timer: None } + } + + /// Push an incoming request for a given authority. + /// + /// Returns: `Ok(())` if succeeded, `Err((args))` if capacity is reached. + pub fn push_req( + &mut self, + peer: AuthorityDiscoveryId, + req: IncomingRequest, + ) -> Result<(), (AuthorityDiscoveryId, IncomingRequest)> { + let queue = match self.queues.entry(peer) { + Entry::Vacant(vacant) => vacant.insert(VecDeque::new()), + Entry::Occupied(occupied) => { + if occupied.get().len() >= PEER_QUEUE_CAPACITY { + return Err((occupied.key().clone(), req)) + } + occupied.into_mut() + }, + }; + queue.push_back(req); + + // We have at least one element to process - rate limit `timer` needs to exist now: + self.ensure_timer(); + Ok(()) + } + + /// Pop all heads and return them for processing. + /// + /// This gets one message from each peer that has sent at least one. + /// + /// This function is rate limited, if called in sequence it will not return more often than + /// every `RECEIVE_RATE_LIMIT`. + /// + /// NOTE: If empty this function will not return `Ready` at all, but will always be `Pending`. + pub async fn pop_reqs(&mut self) -> Vec> { + self.wait_for_timer().await; + + let mut heads = Vec::with_capacity(self.queues.len()); + let old_queues = std::mem::replace(&mut self.queues, HashMap::new()); + for (k, mut queue) in old_queues.into_iter() { + let front = queue.pop_front(); + debug_assert!(front.is_some(), "Invariant that queues are never empty is broken."); + + if let Some(front) = front { + heads.push(front); + } + if !queue.is_empty() { + self.queues.insert(k, queue); + } + } + + if !self.is_empty() { + // Still not empty - we should get woken at some point. + self.ensure_timer(); + } + + heads + } + + /// Whether or not all queues are empty. + pub fn is_empty(&self) -> bool { + self.queues.is_empty() + } + + /// Ensure there is an active `timer`. + /// + /// Checks whether one exists and if not creates one. + fn ensure_timer(&mut self) -> &mut Delay { + self.rate_limit_timer.get_or_insert(Delay::new(RECEIVE_RATE_LIMIT)) + } + + /// Wait for `timer` if it exists, or be `Pending` forever. + /// + /// Afterwards it gets set back to `None`. + async fn wait_for_timer(&mut self) { + match self.rate_limit_timer.as_mut() { + None => pending().await, + Some(timer) => timer.await, + } + self.rate_limit_timer = None; + } +} diff --git a/node/network/dispute-distribution/src/sender/mod.rs b/node/network/dispute-distribution/src/sender/mod.rs index 5312528b413e..09b902173ede 100644 --- a/node/network/dispute-distribution/src/sender/mod.rs +++ b/node/network/dispute-distribution/src/sender/mod.rs @@ -14,10 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::{hash_map::Entry, HashMap, HashSet}; - -use futures::channel::{mpsc, oneshot}; - +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, + task::Poll, + time::Duration, +}; + +use futures::{ + channel::{mpsc, oneshot}, + future::poll_fn, + Future, +}; + +use futures_timer::Delay; +use indexmap::{map::Entry, IndexMap}; use polkadot_node_network_protocol::request_response::v1::DisputeRequest; use polkadot_node_primitives::{CandidateVotes, DisputeMessage, SignedDisputeStatement}; use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate}; @@ -28,22 +39,27 @@ use polkadot_primitives::v2::{CandidateHash, DisputeStatement, Hash, SessionInde /// /// It is going to spawn real tasks as it sees fit for getting the votes of the particular dispute /// out. +/// +/// As we assume disputes have a priority, we start sending for disputes in the order +/// `start_sender` got called. mod send_task; use send_task::SendTask; pub use send_task::TaskFinish; -/// Error and [`Result`] type for sender +/// Error and [`Result`] type for sender. mod error; pub use error::{Error, FatalError, JfyiError, Result}; use self::error::JfyiErrorResult; -use crate::{Metrics, LOG_TARGET}; +use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT}; /// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out. /// /// For each dispute a `SendTask` is responsible for sending to the concerned validators for that /// particular dispute. The `DisputeSender` keeps track of those tasks, informs them about new /// sessions/validator sets and cleans them up when they become obsolete. +/// +/// The unit of work for the `DisputeSender` is a dispute, represented by `SendTask`s. pub struct DisputeSender { /// All heads we currently consider active. active_heads: Vec, @@ -54,11 +70,16 @@ pub struct DisputeSender { active_sessions: HashMap, /// All ongoing dispute sendings this subsystem is aware of. - disputes: HashMap, + /// + /// Using an `IndexMap` so items can be iterated in the order of insertion. + disputes: IndexMap, /// Sender to be cloned for `SendTask`s. tx: mpsc::Sender, + /// Future for delaying too frequent creation of dispute sending tasks. + rate_limit: RateLimit, + /// Metrics for reporting stats about sent requests. metrics: Metrics, } @@ -70,19 +91,25 @@ impl DisputeSender { Self { active_heads: Vec::new(), active_sessions: HashMap::new(), - disputes: HashMap::new(), + disputes: IndexMap::new(), tx, + rate_limit: RateLimit::new(), metrics, } } /// Create a `SendTask` for a particular new dispute. + /// + /// This function is rate-limited by `SEND_RATE_LIMIT`. It will block if called too frequently + /// in order to maintain the limit. pub async fn start_sender( &mut self, ctx: &mut Context, runtime: &mut RuntimeInfo, msg: DisputeMessage, ) -> Result<()> { + self.rate_limit.limit().await; + let req: DisputeRequest = msg.into(); let candidate_hash = req.0.candidate_receipt.hash(); match self.disputes.entry(candidate_hash) { @@ -112,6 +139,8 @@ impl DisputeSender { /// - Get new authorities to send messages to. /// - Get rid of obsolete tasks and disputes. /// - Get dispute sending started in case we missed one for some reason (e.g. on node startup) + /// + /// This function ensures the `SEND_RATE_LIMIT`, therefore it might block. pub async fn update_leaves( &mut self, ctx: &mut Context, @@ -134,21 +163,38 @@ impl DisputeSender { let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c)| c).collect(); - // Cleanup obsolete senders: + // Cleanup obsolete senders (retain keeps order of remaining elements): self.disputes .retain(|candidate_hash, _| active_disputes.contains(candidate_hash)); + // Iterates in order of insertion: + let mut should_rate_limit = true; for dispute in self.disputes.values_mut() { if have_new_sessions || dispute.has_failed_sends() { - dispute + if should_rate_limit { + self.rate_limit.limit().await; + } + let sends_happened = dispute .refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics) .await?; + // Only rate limit if we actually sent something out _and_ it was not just because + // of errors on previous sends. + // + // Reasoning: It would not be acceptable to slow down the whole subsystem, just + // because of a few bad peers having problems. It is actually better to risk + // running into their rate limit in that case and accept a minor reputation change. + should_rate_limit = sends_happened && have_new_sessions; } } - // This should only be non-empty on startup, but if not - we got you covered: + // This should only be non-empty on startup, but if not - we got you covered. + // + // Initial order will not be maintained in that case, but that should be fine as disputes + // recovered at startup will be relatively "old" anyway and we assume that no more than a + // third of the validators will go offline at any point in time anyway. for dispute in unknown_disputes { - self.start_send_for_dispute(ctx, runtime, dispute).await? + self.rate_limit.limit().await; + self.start_send_for_dispute(ctx, runtime, dispute).await?; } Ok(()) } @@ -317,6 +363,46 @@ impl DisputeSender { } } +/// Rate limiting logic. +/// +/// Suitable for the sending side. +struct RateLimit { + limit: Delay, +} + +impl RateLimit { + /// Create new `RateLimit` that is immediately ready. + fn new() -> Self { + // Start with an empty duration, as there has not been any previous call. + Self { limit: Delay::new(Duration::new(0, 0)) } + } + + /// Initialized with actual `SEND_RATE_LIMIT` duration. + fn new_limit() -> Self { + Self { limit: Delay::new(SEND_RATE_LIMIT) } + } + + /// Wait until ready and prepare for next call. + async fn limit(&mut self) { + // Wait for rate limit and add some logging: + poll_fn(|cx| { + let old_limit = Pin::new(&mut self.limit); + match old_limit.poll(cx) { + Poll::Pending => { + gum::debug!( + target: LOG_TARGET, + "Sending rate limit hit, slowing down requests" + ); + Poll::Pending + }, + Poll::Ready(()) => Poll::Ready(()), + } + }) + .await; + *self = Self::new_limit(); + } +} + /// Retrieve the currently active sessions. /// /// List is all indices of all active sessions together with the head that was used for the query. diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index a2b8cdcf7441..89b5c099bde9 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -42,13 +42,15 @@ use crate::{ /// Delivery status for a particular dispute. /// /// Keeps track of all the validators that have to be reached for a dispute. +/// +/// The unit of work for a `SendTask` is an authority/validator. pub struct SendTask { - /// The request we are supposed to get out to all parachain validators of the dispute's session + /// The request we are supposed to get out to all `parachain` validators of the dispute's session /// and to all current authorities. request: DisputeRequest, /// The set of authorities we need to send our messages to. This set will change at session - /// boundaries. It will always be at least the parachain validators of the session where the + /// boundaries. It will always be at least the `parachain` validators of the session where the /// dispute happened and the authorities of the current sessions as determined by active heads. deliveries: HashMap, @@ -100,6 +102,10 @@ impl TaskResult { #[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] impl SendTask { /// Initiates sending a dispute message to peers. + /// + /// Creation of new `SendTask`s is subject to rate limiting. As each `SendTask` will trigger + /// sending a message to each validator, hence for employing a per-peer rate limit, we need to + /// limit the construction of new `SendTask`s. pub async fn new( ctx: &mut Context, runtime: &mut RuntimeInfo, @@ -118,15 +124,22 @@ impl SendTask { /// /// This function is called at construction and should also be called whenever a session change /// happens and on a regular basis to ensure we are retrying failed attempts. + /// + /// This might resend to validators and is thus subject to any rate limiting we might want. + /// Calls to this function for different instances should be rate limited according to + /// `SEND_RATE_LIMIT`. + /// + /// Returns: `True` if this call resulted in new requests. pub async fn refresh_sends( &mut self, ctx: &mut Context, runtime: &mut RuntimeInfo, active_sessions: &HashMap, metrics: &Metrics, - ) -> Result<()> { + ) -> Result { let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?; + // Note this will also contain all authorities for which sending failed previously: let add_authorities = new_authorities .iter() .filter(|a| !self.deliveries.contains_key(a)) @@ -141,12 +154,14 @@ impl SendTask { send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics) .await?; + let was_empty = new_statuses.is_empty(); + self.has_failed_sends = false; self.deliveries.extend(new_statuses.into_iter()); - Ok(()) + Ok(!was_empty) } - /// Whether any sends have failed since the last refreshed. + /// Whether any sends have failed since the last refresh. pub fn has_failed_sends(&self) -> bool { self.has_failed_sends } @@ -193,9 +208,8 @@ impl SendTask { /// Determine all validators that should receive the given dispute requests. /// - /// This is all parachain validators of the session the candidate occurred and all authorities + /// This is all `parachain` validators of the session the candidate occurred and all authorities /// of all currently active sessions, determined by currently active heads. - async fn get_relevant_validators( &self, ctx: &mut Context, @@ -293,7 +307,7 @@ async fn wait_response_task( gum::debug!( target: LOG_TARGET, %err, - "Failed to notify susystem about dispute sending result." + "Failed to notify subsystem about dispute sending result." ); } } diff --git a/node/network/dispute-distribution/src/tests/mock.rs b/node/network/dispute-distribution/src/tests/mock.rs index 08428d5852cc..aa2a4485d480 100644 --- a/node/network/dispute-distribution/src/tests/mock.rs +++ b/node/network/dispute-distribution/src/tests/mock.rs @@ -20,6 +20,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, + time::Instant, }; use async_trait::async_trait; @@ -38,6 +39,8 @@ use polkadot_primitives::v2::{ }; use polkadot_primitives_test_helpers::dummy_candidate_descriptor; +use crate::LOG_TARGET; + pub const MOCK_SESSION_INDEX: SessionIndex = 1; pub const MOCK_NEXT_SESSION_INDEX: SessionIndex = 2; pub const MOCK_VALIDATORS: [Sr25519Keyring; 6] = [ @@ -54,6 +57,8 @@ pub const MOCK_AUTHORITIES_NEXT_SESSION: [Sr25519Keyring; 2] = pub const FERDIE_INDEX: ValidatorIndex = ValidatorIndex(0); pub const ALICE_INDEX: ValidatorIndex = ValidatorIndex(1); +pub const BOB_INDEX: ValidatorIndex = ValidatorIndex(2); +pub const CHARLIE_INDEX: ValidatorIndex = ValidatorIndex(3); lazy_static! { @@ -148,12 +153,22 @@ pub async fn make_dispute_message( invalid_validator: ValidatorIndex, ) -> DisputeMessage { let candidate_hash = candidate.hash(); + let before_request = Instant::now(); let valid_vote = make_explicit_signed(MOCK_VALIDATORS[valid_validator.0 as usize], candidate_hash, true) .await; + gum::trace!( + "Passed time for valid vote: {:#?}", + Instant::now().saturating_duration_since(before_request) + ); + let before_request = Instant::now(); let invalid_vote = make_explicit_signed(MOCK_VALIDATORS[invalid_validator.0 as usize], candidate_hash, false) .await; + gum::trace!( + "Passed time for invald vote: {:#?}", + Instant::now().saturating_duration_since(before_request) + ); DisputeMessage::from_signed_statements( valid_vote, valid_validator, @@ -206,10 +221,15 @@ impl AuthorityDiscovery for MockAuthorityDiscovery { ) -> Option> { for (a, p) in self.peer_ids.iter() { if p == &peer_id { - return Some(HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS - .get(&a) - .unwrap() - .clone()])) + let result = + HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS.get(&a).unwrap().clone()]); + gum::trace!( + target: LOG_TARGET, + %peer_id, + ?result, + "Returning authority ids for peer id" + ); + return Some(result) } } diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index 8ef8286ea197..56cdd467fd62 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -17,12 +17,17 @@ //! Subsystem unit tests -use std::{collections::HashSet, sync::Arc, task::Poll, time::Duration}; +use std::{ + collections::HashSet, + sync::Arc, + task::Poll, + time::{Duration, Instant}, +}; use assert_matches::assert_matches; use futures::{ channel::{mpsc, oneshot}, - future::poll_fn, + future::{poll_fn, ready}, pin_mut, Future, SinkExt, }; use futures_timer::Delay; @@ -52,7 +57,7 @@ use polkadot_node_subsystem_test_helpers::{ mock::make_ferdie_keystore, subsystem_test_harness, TestSubsystemContextHandle, }; use polkadot_primitives::v2::{ - AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, SessionInfo, + AuthorityDiscoveryId, CandidateHash, CandidateReceipt, Hash, SessionIndex, SessionInfo, }; use self::mock::{ @@ -60,7 +65,11 @@ use self::mock::{ MOCK_AUTHORITY_DISCOVERY, MOCK_NEXT_SESSION_INDEX, MOCK_NEXT_SESSION_INFO, MOCK_SESSION_INDEX, MOCK_SESSION_INFO, }; -use crate::{DisputeDistributionSubsystem, Metrics, LOG_TARGET}; +use crate::{ + receiver::BATCH_COLLECTING_INTERVAL, + tests::mock::{BOB_INDEX, CHARLIE_INDEX}, + DisputeDistributionSubsystem, Metrics, LOG_TARGET, SEND_RATE_LIMIT, +}; /// Useful mock providers. pub mod mock; @@ -72,49 +81,108 @@ fn send_dispute_sends_dispute() { let relay_parent = Hash::random(); let candidate = make_candidate_receipt(relay_parent); - let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; - handle - .send(FromOrchestra::Communication { - msg: DisputeDistributionMessage::SendDispute(message.clone()), - }) - .await; + send_dispute(&mut handle, candidate, true).await; + conclude(&mut handle).await; + }; + test_harness(test); +} + +#[test] +fn send_honors_rate_limit() { + sp_tracing::try_init_simple(); + let test = |mut handle: TestSubsystemContextHandle, _req_cfg| async move { + let _ = handle_subsystem_startup(&mut handle, None).await; + + let relay_parent = Hash::random(); + let candidate = make_candidate_receipt(relay_parent); + let before_request = Instant::now(); + send_dispute(&mut handle, candidate, true).await; + // First send should not be rate limited: + gum::trace!("Passed time: {:#?}", Instant::now().saturating_duration_since(before_request)); + // This test would likely be flaky on CI: + //assert!(Instant::now().saturating_duration_since(before_request) < SEND_RATE_LIMIT); + + let relay_parent = Hash::random(); + let candidate = make_candidate_receipt(relay_parent); + send_dispute(&mut handle, candidate, false).await; + // Second send should be rate limited: + gum::trace!( + "Passed time for send_dispute: {:#?}", + Instant::now().saturating_duration_since(before_request) + ); + assert!(Instant::now() - before_request >= SEND_RATE_LIMIT); + conclude(&mut handle).await; + }; + test_harness(test); +} + +/// Helper for sending a new dispute to dispute-distribution sender and handling resulting messages. +async fn send_dispute( + handle: &mut TestSubsystemContextHandle, + candidate: CandidateReceipt, + needs_session_info: bool, +) { + let before_request = Instant::now(); + let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; + gum::trace!( + "Passed time for making message: {:#?}", + Instant::now().saturating_duration_since(before_request) + ); + let before_request = Instant::now(); + handle + .send(FromOrchestra::Communication { + msg: DisputeDistributionMessage::SendDispute(message.clone()), + }) + .await; + gum::trace!( + "Passed time for sending message: {:#?}", + Instant::now().saturating_duration_since(before_request) + ); + if needs_session_info { // Requests needed session info: assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::SessionInfo(session_index, tx) + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::SessionInfo(session_index, tx) ) ) => { - assert_eq!(session_index, MOCK_SESSION_INDEX); - assert_eq!( - hash, - message.candidate_receipt().descriptor.relay_parent + assert_eq!(session_index, MOCK_SESSION_INDEX); + assert_eq!( + hash, + message.candidate_receipt().descriptor.relay_parent ); - tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive."); - } + tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive."); + } ); + } - let expected_receivers = { - let info = &MOCK_SESSION_INFO; - info.discovery_keys - .clone() - .into_iter() - .filter(|a| a != &Sr25519Keyring::Ferdie.public().into()) - .collect() - // All validators are also authorities in the first session, so we are - // done here. - }; - check_sent_requests(&mut handle, expected_receivers, true).await; - - conclude(&mut handle).await; + let expected_receivers = { + let info = &MOCK_SESSION_INFO; + info.discovery_keys + .clone() + .into_iter() + .filter(|a| a != &Sr25519Keyring::Ferdie.public().into()) + .collect() + // All validators are also authorities in the first session, so we are + // done here. }; - test_harness(test); + check_sent_requests(handle, expected_receivers, true).await; } +// Things to test: +// x Request triggers import +// x Subsequent imports get batched +// x Batch gets flushed. +// x Batch gets renewed. +// x Non authority requests get dropped. +// x Sending rate limit is honored. +// x Receiving rate limit is honored. +// x Duplicate requests on batch are dropped + #[test] -fn received_request_triggers_import() { +fn received_non_authorities_are_dropped() { let test = |mut handle: TestSubsystemContextHandle, mut req_cfg: RequestResponseConfig| async move { let req_tx = req_cfg.inbound_queue.as_mut().unwrap(); @@ -140,110 +208,271 @@ fn received_request_triggers_import() { assert_eq!(reputation_changes.len(), 1); } ); + conclude(&mut handle).await; + }; + test_harness(test); +} + +#[test] +fn received_request_triggers_import() { + let test = |mut handle: TestSubsystemContextHandle, + mut req_cfg: RequestResponseConfig| async move { + let req_tx = req_cfg.inbound_queue.as_mut().unwrap(); + let _ = handle_subsystem_startup(&mut handle, None).await; + + let relay_parent = Hash::random(); + let candidate = make_candidate_receipt(relay_parent); + let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; - // Nested valid and invalid import. - // - // Nested requests from same peer should get dropped. For the invalid request even - // subsequent requests should get dropped. nested_network_dispute_request( &mut handle, req_tx, MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice), message.clone().into(), - ImportStatementsResult::InvalidImport, + ImportStatementsResult::ValidImport, true, - move |handle, req_tx, message| { - nested_network_dispute_request( - handle, - req_tx, - MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob), - message.clone().into(), - ImportStatementsResult::ValidImport, - false, - move |_, req_tx, message| async move { - // Another request from Alice should get dropped (request already in - // flight): - { - let rx_response = send_network_dispute_request( - req_tx, - MOCK_AUTHORITY_DISCOVERY - .get_peer_id_by_authority(Sr25519Keyring::Alice), - message.clone(), - ) - .await; - - assert_matches!( - rx_response.await, - Err(err) => { - gum::trace!( - target: LOG_TARGET, - ?err, - "Request got dropped - other request already in flight" - ); - } - ); - } - // Another request from Bob should get dropped (request already in - // flight): - { - let rx_response = send_network_dispute_request( - req_tx, - MOCK_AUTHORITY_DISCOVERY - .get_peer_id_by_authority(Sr25519Keyring::Bob), - message.clone(), - ) - .await; - - assert_matches!( - rx_response.await, - Err(err) => { - gum::trace!( - target: LOG_TARGET, - ?err, - "Request got dropped - other request already in flight" - ); - } - ); - } - }, - ) - }, + move |_handle, _req_tx, _message| ready(()), ) .await; - // Subsequent sends from Alice should fail (peer is banned): - { - let rx_response = send_network_dispute_request( - req_tx, - MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice), - message.clone().into(), - ) - .await; + gum::trace!(target: LOG_TARGET, "Concluding."); + conclude(&mut handle).await; + }; + test_harness(test); +} + +#[test] +fn batching_works() { + let test = |mut handle: TestSubsystemContextHandle, + mut req_cfg: RequestResponseConfig| async move { + let req_tx = req_cfg.inbound_queue.as_mut().unwrap(); + let _ = handle_subsystem_startup(&mut handle, None).await; + + let relay_parent = Hash::random(); + let candidate = make_candidate_receipt(relay_parent); + let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; + + // Initial request should get forwarded immediately: + nested_network_dispute_request( + &mut handle, + req_tx, + MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice), + message.clone().into(), + ImportStatementsResult::ValidImport, + true, + move |_handle, _req_tx, _message| ready(()), + ) + .await; + let mut rx_responses = Vec::new(); + + let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await; + let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob); + rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + + let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await; + let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Charlie); + rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + gum::trace!("Imported 3 votes into batch"); + + Delay::new(BATCH_COLLECTING_INTERVAL).await; + gum::trace!("Batch should still be alive"); + // Batch should still be alive (2 new votes): + // Let's import two more votes, but fully duplicates - should not extend batch live. + gum::trace!("Importing duplicate votes"); + let mut rx_responses_duplicate = Vec::new(); + let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await; + let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob); + rx_responses_duplicate + .push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + + let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await; + let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Charlie); + rx_responses_duplicate + .push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + + for rx_response in rx_responses_duplicate { assert_matches!( rx_response.await, - Err(err) => { + Ok(resp) => { + let sc_network::config::OutgoingResponse { + result, + reputation_changes, + sent_feedback: _, + } = resp; gum::trace!( target: LOG_TARGET, - ?err, - "Request got dropped - peer is banned." + ?reputation_changes, + "Received reputation changes." + ); + // We don't punish on that. + assert_eq!(reputation_changes.len(), 0); + + assert_matches!(result, Err(())); + } + ); + } + + Delay::new(BATCH_COLLECTING_INTERVAL).await; + gum::trace!("Batch should be ready now (only duplicates have been added)"); + + let pending_confirmation = assert_matches!( + handle.recv().await, + AllMessages::DisputeCoordinator( + DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: _, + session, + statements, + pending_confirmation: Some(pending_confirmation), + } + ) => { + assert_eq!(session, MOCK_SESSION_INDEX); + assert_eq!(statements.len(), 3); + pending_confirmation + } + ); + pending_confirmation.send(ImportStatementsResult::ValidImport).unwrap(); + + for rx_response in rx_responses { + assert_matches!( + rx_response.await, + Ok(resp) => { + let sc_network::config::OutgoingResponse { + result, + reputation_changes: _, + sent_feedback, + } = resp; + + let result = result.unwrap(); + let decoded = + ::decode(&mut result.as_slice()).unwrap(); + + assert!(decoded == DisputeResponse::Confirmed); + if let Some(sent_feedback) = sent_feedback { + sent_feedback.send(()).unwrap(); + } + gum::trace!( + target: LOG_TARGET, + "Valid import happened." ); + } ); } - // But should work fine for Bob: + gum::trace!(target: LOG_TARGET, "Concluding."); + conclude(&mut handle).await; + }; + test_harness(test); +} + +#[test] +fn receive_rate_limit_is_enforced() { + let test = |mut handle: TestSubsystemContextHandle, + mut req_cfg: RequestResponseConfig| async move { + let req_tx = req_cfg.inbound_queue.as_mut().unwrap(); + let _ = handle_subsystem_startup(&mut handle, None).await; + + let relay_parent = Hash::random(); + let candidate = make_candidate_receipt(relay_parent); + let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; + + // Initial request should get forwarded immediately: nested_network_dispute_request( &mut handle, req_tx, - MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob), + MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice), message.clone().into(), ImportStatementsResult::ValidImport, - false, - |_, _, _| async {}, + true, + move |_handle, _req_tx, _message| ready(()), ) .await; + let mut rx_responses = Vec::new(); + + let peer = MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob); + + let message = make_dispute_message(candidate.clone(), BOB_INDEX, FERDIE_INDEX).await; + rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + + let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, FERDIE_INDEX).await; + rx_responses.push(send_network_dispute_request(req_tx, peer, message.clone().into()).await); + + gum::trace!("Import one too much:"); + + let message = make_dispute_message(candidate.clone(), CHARLIE_INDEX, ALICE_INDEX).await; + let rx_response_flood = + send_network_dispute_request(req_tx, peer, message.clone().into()).await; + + assert_matches!( + rx_response_flood.await, + Ok(resp) => { + let sc_network::config::OutgoingResponse { + result: _, + reputation_changes, + sent_feedback: _, + } = resp; + gum::trace!( + target: LOG_TARGET, + ?reputation_changes, + "Received reputation changes." + ); + // Received punishment for flood: + assert_eq!(reputation_changes.len(), 1); + } + ); + gum::trace!("Need to wait 2 patch intervals:"); + Delay::new(BATCH_COLLECTING_INTERVAL).await; + Delay::new(BATCH_COLLECTING_INTERVAL).await; + + gum::trace!("Batch should be ready now"); + + let pending_confirmation = assert_matches!( + handle.recv().await, + AllMessages::DisputeCoordinator( + DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: _, + session, + statements, + pending_confirmation: Some(pending_confirmation), + } + ) => { + assert_eq!(session, MOCK_SESSION_INDEX); + // Only 3 as fourth was flood: + assert_eq!(statements.len(), 3); + pending_confirmation + } + ); + pending_confirmation.send(ImportStatementsResult::ValidImport).unwrap(); + + for rx_response in rx_responses { + assert_matches!( + rx_response.await, + Ok(resp) => { + let sc_network::config::OutgoingResponse { + result, + reputation_changes: _, + sent_feedback, + } = resp; + + let result = result.unwrap(); + let decoded = + ::decode(&mut result.as_slice()).unwrap(); + + assert!(decoded == DisputeResponse::Confirmed); + if let Some(sent_feedback) = sent_feedback { + sent_feedback.send(()).unwrap(); + } + gum::trace!( + target: LOG_TARGET, + "Valid import happened." + ); + + } + ); + } + gum::trace!(target: LOG_TARGET, "Concluding."); conclude(&mut handle).await; }; diff --git a/node/network/protocol/src/request_response/mod.rs b/node/network/protocol/src/request_response/mod.rs index 5f4740279ef6..d24537e219c7 100644 --- a/node/network/protocol/src/request_response/mod.rs +++ b/node/network/protocol/src/request_response/mod.rs @@ -121,6 +121,10 @@ const POV_RESPONSE_SIZE: u64 = MAX_POV_SIZE as u64 + 10_000; /// This is `MAX_CODE_SIZE` plus some additional space for protocol overhead. const STATEMENT_RESPONSE_SIZE: u64 = MAX_CODE_SIZE as u64 + 10_000; +/// We can have relative large timeouts here, there is no value of hitting a +/// timeout as we want to get statements through to each node in any case. +pub const DISPUTE_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); + impl Protocol { /// Get a configuration for a given Request response protocol. /// @@ -194,9 +198,7 @@ impl Protocol { /// Responses are just confirmation, in essence not even a bit. So 100 seems /// plenty. max_response_size: 100, - /// We can have relative large timeouts here, there is no value of hitting a - /// timeout as we want to get statements through to each node in any case. - request_timeout: Duration::from_secs(12), + request_timeout: DISPUTE_REQUEST_TIMEOUT, inbound_queue: Some(tx), }, }; diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md b/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md index b63ea2bdcbf0..6b8e5ec03cf4 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md @@ -15,6 +15,13 @@ This design should result in a protocol that is: ## Protocol +Distributing disputes needs to be a reliable protocol. We would like to make as +sure as possible that our vote got properly delivered to all concerned +validators. For this to work, this subsystem won't be gossip based, but instead +will use a request/response protocol for application level confirmations. The +request will be the payload (the actual votes/statements), the response will +be the confirmation. See [below][#wire-format]. + ### Input [`DisputeDistributionMessage`][DisputeDistributionMessage] @@ -107,16 +114,7 @@ struct VotesResponse { } ``` -## Functionality - -Distributing disputes needs to be a reliable protocol. We would like to make as -sure as possible that our vote got properly delivered to all concerned -validators. For this to work, this subsystem won't be gossip based, but instead -will use a request/response protocol for application level confirmations. The -request will be the payload (the actual votes/statements), the response will -be the confirmation. See [above][#wire-format]. - -### Starting a Dispute +## Starting a Dispute A dispute is initiated once a node sends the first `DisputeRequest` wire message, which must contain an "invalid" vote and a "valid" vote. @@ -132,7 +130,7 @@ conflicting votes available, hence we have a valid dispute. Nodes will still need to check whether the disputing votes are somewhat current and not some stale ones. -### Participating in a Dispute +## Participating in a Dispute Upon receiving a `DisputeRequest` message, a dispute distribution will trigger the import of the received votes via the dispute coordinator @@ -144,13 +142,13 @@ except that if the local node deemed the candidate valid, the `SendDispute` message will contain a valid vote signed by our node and will contain the initially received `Invalid` vote. -Note, that we rely on the coordinator to check availability for spam protection -(see below). +Note, that we rely on `dispute-coordinator` to check validity of a dispute for spam +protection (see below). -### Sending of messages +## Sending of messages Starting and participating in a dispute are pretty similar from the perspective -of dispute distribution. Once we receive a `SendDispute` message we try to make +of dispute distribution. Once we receive a `SendDispute` message, we try to make sure to get the data out. We keep track of all the parachain validators that should see the message, which are all the parachain validators of the session where the dispute happened as they will want to participate in the dispute. In @@ -159,114 +157,185 @@ session (which might be the same or not and may change during the dispute). Those authorities will not participate in the dispute, but need to see the statements so they can include them in blocks. -We keep track of connected parachain validators and authorities and will issue -warnings in the logs if connected nodes are less than two thirds of the -corresponding sets. We also only consider a message transmitted, once we -received a confirmation message. If not, we will keep retrying getting that -message out as long as the dispute is deemed alive. To determine whether a -dispute is still alive we will issue a +### Reliability + +We only consider a message transmitted, once we received a confirmation message. +If not, we will keep retrying getting that message out as long as the dispute is +deemed alive. To determine whether a dispute is still alive we will ask the +`dispute-coordinator` for a list of all still active disputes via a `DisputeCoordinatorMessage::ActiveDisputes` message before each retry run. Once a dispute is no longer live, we will clean up the state accordingly. -### Reception & Spam Considerations - -Because we are not forwarding foreign statements, spam is less of an issue in -comparison to gossip based systems. Rate limiting should be implemented at the -substrate level, see -[#7750](https://github.com/paritytech/substrate/issues/7750). Still we should -make sure that it is not possible via spamming to prevent a dispute concluding -or worse from getting noticed. - -Considered attack vectors: - -1. Invalid disputes (candidate does not exist) could make us - run out of resources. E.g. if we recorded every statement, we could run out - of disk space eventually. -2. An attacker can just flood us with notifications on any notification - protocol, assuming flood protection is not effective enough, our unbounded - buffers can fill up and we will run out of memory eventually. -3. An attacker could participate in a valid dispute, but send its votes multiple - times. -4. Attackers could spam us at a high rate with invalid disputes. Our incoming - queue of requests could get monopolized by those malicious requests and we - won't be able to import any valid disputes and we could run out of resources, - if we tried to process them all in parallel. - -For tackling 1, we make sure to not occupy resources before we don't know a -candidate is available. So we will not record statements to disk until we -recovered availability for the candidate or know by some other means that the -dispute is legit. - -For 2, we will pick up on any dispute on restart, so assuming that any realistic -memory filling attack will take some time, we should be able to participate in a -dispute under such attacks. - -Importing/discarding redundant votes should be pretty quick, so measures with -regards to 4 should suffice to prevent 3, from doing any real harm. - -For 4, full monopolization of the incoming queue should not be possible assuming -substrate handles incoming requests in a somewhat fair way. Still we want some -defense mechanisms, at the very least we need to make sure to not exhaust -resources. - -The dispute coordinator will notify us on import about unavailable candidates or -otherwise invalid imports and we can disconnect from such peers/decrease their -reputation drastically. This alone should get us quite far with regards to queue -monopolization, as availability recovery is expected to fail relatively quickly -for unavailable data. - -Still if those spam messages come at a very high rate, we might still run out of -resources if we immediately call `DisputeCoordinatorMessage::ImportStatements` -on each one of them. Secondly with our assumption of 1/3 dishonest validators, -getting rid of all of them will take some time, depending on reputation timeouts -some of them might even be able to reconnect eventually. - -To mitigate those issues we will process dispute messages with a maximum -parallelism `N`. We initiate import processes for up to `N` candidates in -parallel. Once we reached `N` parallel requests we will start back pressuring on -the incoming requests. This saves us from resource exhaustion. - -To reduce impact of malicious nodes further, we can keep track from which nodes the -currently importing statements came from and will drop requests from nodes that -already have imports in flight. - -Honest nodes are not expected to send dispute statements at a high rate, but -even if they did: - -- we will import at least the first one and if it is valid it will trigger a - dispute, preventing finality. -- Chances are good that the first sent candidate from a peer is indeed the - oldest one (if they differ in age at all). -- for the dropped request any honest node will retry sending. -- there will be other nodes notifying us about that dispute as well. -- honest votes have a speed advantage on average. Apart from the very first - dispute statement for a candidate, which might cause the availability recovery - process, imports of honest votes will be super fast, while for spam imports - they will always take some time as we have to wait for availability to fail. - -So this general rate limit, that we drop requests from same peers if they come -faster than we can import the statements should not cause any problems for -honest nodes and is in their favor. - -Size of `N`: The larger `N` the better we can handle distributed flood attacks -(see previous paragraph), but we also get potentially more availability recovery -processes happening at the same time, which slows down the individual processes. -And we rather want to have one finish quickly than lots slowly at the same time. -On the other hand, valid disputes are expected to be rare, so if we ever exhaust -`N` it is very likely that this is caused by spam and spam recoveries don't cost -too much bandwidth due to empty responses. - -Considering that an attacker would need to attack many nodes in parallel to have -any effect, an `N` of 10 seems to be a good compromise. For honest requests, most -of those imports will likely concern the same candidate, and for dishonest ones -we get to disconnect from up to ten colluding adversaries at a time. - -For the size of the channel for incoming requests: Due to dropping of repeated -requests from same nodes we can make the channel relatively large without fear -of lots of spam requests sitting there wasting our time, even after we already -blocked a peer. For valid disputes, incoming requests can become bursty. On the -other hand we will also be very quick in processing them. A channel size of 100 -requests seems plenty and should be able to handle bursts adequately. +### Order + +We assume `SendDispute` messages are coming in an order of importance, hence +`dispute-distribution` will make sure to send out network messages in the same +order, even on retry. + +### Rate Limit + +For spam protection (see below), we employ an artificial rate limiting on sending +out messages in order to not hit the rate limit at the receiving side, which +would result in our messages getting dropped and our reputation getting reduced. + +## Reception + +As we shall see the receiving side is mostly about handling spam and ensuring +the dispute-coordinator learns about disputes as fast as possible. + +Goals for the receiving side: + +1. Get new disputes to the dispute-coordinator as fast as possible, so + prioritization can happen properly. +2. Batch votes per disputes as much as possible for good import performance. +3. Prevent malicious nodes exhausting node resources by sending lots of messages. +4. Prevent malicious nodes from sending so many messages/(fake) disputes, + preventing us from concluding good ones. +5. Limit ability of malicious nodes of delaying the vote import due to batching + logic. + +Goal 1 and 2 seem to be conflicting, but an easy compromise is possible: When +learning about a new dispute, we will import the vote immediately, making the +dispute coordinator aware and also getting immediate feedback on the validity. +Then if valid we can batch further incoming votes, with less time constraints as +the dispute-coordinator already knows about the dispute. + +Goal 3 and 4 are obviously very related and both can easily be solved via rate +limiting as we shall see below. Rate limits should already be implemented at the +substrate level, but [are not](https://github.com/paritytech/substrate/issues/7750) +at the time of writing. But even if they were, the enforced substrate limits would +likely not be configurable and thus would still be to high for our needs as we can +rely on the following observations: + +1. Each honest validator will only send one message (apart from duplicates on + timeout) per candidate/dispute. +2. An honest validator needs to fully recover availability and validate the + candidate for casting a vote. + +With these two observations, we can conclude that honest validators will usually +not send messages at a high rate. We can therefore enforce conservative rate +limits and thus minimize harm spamming malicious nodes can have. + +Before we dive into how rate limiting solves all spam issues elegantly, let's +discuss that honest behaviour further: + +What about session changes? Here we might have to inform a new validator set of +lots of already existing disputes at once. + +With observation 1) and a rate limit that is per peer, we are still good: + +Let's assume a rate limit of one message per 200ms per sender. This means 5 +messages from each validator per second. 5 messages means 5 disputes! +Conclusively, we will be able to conclude 5 disputes per second - no matter what +malicious actors are doing. This is assuming dispute messages are sent ordered, +but even if not perfectly ordered: On average it will be 5 disputes per second. + +This is good enough! All those disputes are valid ones and will result in +slashing and disabling of validators. Let's assume all of them conclude `valid`, +and we disable validators only after 100 raised concluding valid disputes, we +would still start disabling misbehaving validators in only 20 seconds. + +One could also think that in addition participation is expected to take longer, +which means on average we can import/conclude disputes faster than they are +generated - regardless of dispute spam. Unfortunately this is not necessarily +true: There might be parachains with very light load where recovery and +validation can be accomplished very quickly - maybe faster than we can import +those disputes. + +This is probably an argument for not imposing a too low rate limit, although the +issue is more general: Even without any rate limit, if an attacker generates +disputes at a very high rate, nodes will be having trouble keeping participation +up, hence the problem should be mitigated at a [more fundamental +layer](https://github.com/paritytech/polkadot/issues/5898). + +For nodes that have been offline for a while, the same argument as for session +changes holds, but matters even less: We assume 2/3 of nodes to be online, so +even if the worst case 1/3 offline happens and they could not import votes fast +enough (as argued above, they in fact can) it would not matter for consensus. + +### Rate Limiting + +As suggested previously, rate limiting allows to mitigate all threats that come +from malicious actors trying to overwhelm the system in order to get away without +a slash, when it comes to dispute-distribution. In this section we will explain +how in greater detail. + +The idea is to open a queue with limited size for each peer. We will process +incoming messages as fast as we can by doing the following: + +1. Check that the sending peer is actually a valid authority - otherwise drop + message and decrease reputation/disconnect. +2. Put message on the peer's queue, if queue is full - drop it. + +Every `RATE_LIMIT` seconds (or rather milliseconds), we pause processing +incoming requests to go a full circle and process one message from each queue. +Processing means `Batching` as explained in the next section. + +### Batching + +To achieve goal 2 we will batch incoming votes/messages together before passing +them on as a single batch to the `dispute-coordinator`. To adhere to goal 1 as +well, we will do the following: + +1. For an incoming message, we check whether we have an existing batch for that + candidate, if not we import directly to the dispute-coordinator, as we have + to assume this is concerning a new dispute. +2. We open a batch and start collecting incoming messages for that candidate, + instead of immediately forwarding. +4. We keep collecting votes in the batch until we receive less than + `MIN_KEEP_BATCH_ALIVE_VOTES` unique votes in the last `BATCH_COLLECTING_INTERVAL`. This is + important to accommodate for goal 5 and also 3. +5. We send the whole batch to the dispute-coordinator. + +This together with rate limiting explained above ensures we will be able to +process valid disputes: We can limit the number of simultaneous existing batches +to some high value, but can be rather certain that this limit will never be +reached - hence we won't drop valid disputes: + +Let's assume `MIN_KEEP_BATCH_ALIVE_VOTES` is 10, `BATCH_COLLECTING_INTERVAL` +is `500ms` and above `RATE_LIMIT` is `100ms`. 1/3 of validators are malicious, +so for 1000 this means around 330 malicious actors worst case. + +All those actors can send a message every `100ms`, that is 10 per second. This +means at the begining of an attack they can open up around 3300 batches. Each +containing two votes. So memory usage is still negligible. In reality it is even +less, as we also demand 10 new votes to trickle in per batch in order to keep it +alive, every `500ms`. Hence for the first second, each batch requires 20 votes +each. Each message is 2 votes, so this means 10 messages per batch. Hence to +keep those batches alive 10 attackers are needed for each batch. This reduces +the number of opened batches by a factor of 10: So we only have 330 batches in 1 +second - each containing 20 votes. + +The next second: In order to further grow memory usage, attackers have to +maintain 10 messages per batch and second. Number of batches equals the number +of attackers, each has 10 messages per second, all are needed to maintain the +batches in memory. Therefore we have a hard cap of around 330 (number of +malicious nodes) open batches. Each can be filled with number of malicious +actor's votes. So 330 batches with each 330 votes: Let's assume approximately 100 +bytes per signature/vote. This results in a worst case memory usage of 330 * 330 +* 100 ~= 10 MiB. + +For 10_000 validators, we are already in the Gigabyte range, which means that +with a validator set that large we might want to be more strict with the rate limit or +require a larger rate of incoming votes per batch to keep them alive. + +For a thousand validators a limit on batches of around 1000 should never be +reached in practice. Hence due to rate limiting we have a very good chance to +not ever having to drop a potential valid dispute due to some resource limit. + +Further safe guards are possible: The dispute-coordinator actually +confirms/denies imports. So once we receive a denial by the dispute-coordinator +for the initial imported votes, we can opt into flushing the batch immediately +and importing the votes. This swaps memory usage for more CPU usage, but if that +import is deemed invalid again we can immediately decrease the reputation of the +sending peers, so this should be a net win. For the time being we punt on this +for simplicity. + +Instead of filling batches to maximize memory usage, attackers could also try to +overwhelm the dispute coordinator by only sending votes for new candidates all +the time. This attack vector is mitigated also by above rate limit and +decreasing the peer's reputation on denial of the invalid imports by the +coordinator. ### Node Startup