Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Batch vote import in dispute-distribution #5894

Merged
merged 45 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b202cc8
Start work on batching in dispute-distribution.
eskimor Aug 16, 2022
192d73b
Guide work.
eskimor Aug 17, 2022
a2f8fde
More guide changes. Still very much WIP.
eskimor Aug 18, 2022
5e2f4a5
Finish guide changes.
eskimor Aug 22, 2022
585ec6d
Clarification
eskimor Aug 22, 2022
7e02910
Adjust argument about slashing.
eskimor Aug 23, 2022
4e170e2
WIP: Add constants to receiver.
eskimor Aug 23, 2022
362ebae
Maintain order of disputes.
eskimor Aug 24, 2022
68c7073
dispute-distribuion sender Rate limit.
eskimor Aug 24, 2022
56519a7
Cleanup
eskimor Aug 25, 2022
b3ab280
WIP: dispute-distribution receiver.
eskimor Aug 25, 2022
4b4df00
WIP: Batching.
eskimor Aug 25, 2022
5816c8d
fmt
eskimor Aug 25, 2022
a821efc
Update `PeerQueues` to maintain more invariants.
eskimor Aug 26, 2022
3b71817
WIP: Batching.
eskimor Aug 26, 2022
909569b
Small cleanup
eskimor Aug 26, 2022
fa14c43
Batching logic.
eskimor Aug 30, 2022
89d622d
Some integration work.
eskimor Aug 30, 2022
401ffb8
Finish.
eskimor Aug 31, 2022
3dd2041
Typo.
eskimor Sep 1, 2022
f9fc6c8
Docs.
eskimor Sep 1, 2022
43d946d
Report missing metric.
eskimor Sep 1, 2022
3e5ceaa
Doc pass.
eskimor Sep 1, 2022
0e11f4f
Tests for waiting_queue.
eskimor Sep 2, 2022
2fcebad
Speed up some crypto by 10x.
eskimor Sep 2, 2022
3d444ae
Fix redundant import.
eskimor Sep 2, 2022
e7923a7
Add some tracing.
eskimor Sep 2, 2022
8b8f722
Better sender rate limit
eskimor Sep 2, 2022
a79a96d
Some tests.
eskimor Sep 2, 2022
c1ed615
Tests
eskimor Sep 5, 2022
f4c530c
Add logging to rate limiter
eskimor Sep 5, 2022
795c6c1
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
d46be76
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
9b8787d
Update node/network/dispute-distribution/src/receiver/mod.rs
eskimor Sep 8, 2022
9795098
Review feedback.
eskimor Sep 9, 2022
465f266
Also log peer in log messages.
eskimor Sep 21, 2022
88c61d2
Fix indentation.
eskimor Sep 28, 2022
78066ac
waker -> timer
eskimor Sep 28, 2022
3e43b0a
Guide improvement.
eskimor Sep 28, 2022
f0e5001
Remove obsolete comment.
eskimor Sep 28, 2022
30d10f4
waker -> timer
eskimor Oct 4, 2022
8ef4a24
Fix spell complaints.
eskimor Oct 4, 2022
5b5d82d
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
f0a054b
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
35d698c
Fix Cargo.lock
eskimor Oct 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ maintenance = { status = "actively-developed" }
#
# This list is ordered alphabetically.
[profile.dev.package]
blake2b_simd = { opt-level = 3 }
blake2 = { opt-level = 3 }
blake2-rfc = { opt-level = 3 }
blake2b_simd = { opt-level = 3 }
chacha20poly1305 = { opt-level = 3 }
cranelift-codegen = { opt-level = 3 }
cranelift-wasm = { opt-level = 3 }
Expand All @@ -138,8 +138,8 @@ curve25519-dalek = { opt-level = 3 }
ed25519-dalek = { opt-level = 3 }
flate2 = { opt-level = 3 }
futures-channel = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hash-db = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hmac = { opt-level = 3 }
httparse = { opt-level = 3 }
integer-sqrt = { opt-level = 3 }
Expand All @@ -151,8 +151,8 @@ libz-sys = { opt-level = 3 }
mio = { opt-level = 3 }
nalgebra = { opt-level = 3 }
num-bigint = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
parking_lot = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
percent-encoding = { opt-level = 3 }
primitive-types = { opt-level = 3 }
reed-solomon-novelpoly = { opt-level = 3 }
Expand All @@ -162,6 +162,7 @@ sha2 = { opt-level = 3 }
sha3 = { opt-level = 3 }
smallvec = { opt-level = 3 }
snow = { opt-level = 3 }
substrate-bip39 = {opt-level = 3}
twox-hash = { opt-level = 3 }
uint = { opt-level = 3 }
wasmi = { opt-level = 3 }
Expand Down
2 changes: 2 additions & 0 deletions node/network/dispute-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
derive_more = "0.99.17"
parity-scale-codec = { version = "3.1.5", features = ["std"] }
Expand All @@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
thiserror = "1.0.31"
fatality = "0.0.6"
lru = "0.8.0"
indexmap = "1.9.1"

[dev-dependencies]
async-trait = "0.1.57"
Expand Down
38 changes: 31 additions & 7 deletions node/network/dispute-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].

use std::num::NonZeroUsize;
use std::{num::NonZeroUsize, time::Duration};

use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};

Expand Down Expand Up @@ -66,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish};
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
/// the following:
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
/// resolution and handling of spamming validators:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if we are already importing a message from that node (flood).
/// - Drop messages from nodes, that provided us messages where the statement import failed.
/// - Drop messages from a node, if it sends at a too high rate.
/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
/// do, while at the same time making it aware of new disputes as fast as possible.
///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
/// receiver.
Expand All @@ -95,6 +98,20 @@ pub use metrics::Metrics;

const LOG_TARGET: &'static str = "parachain::dispute-distribution";

/// Rate limit on the `receiver` side.
///
/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
/// start dropping messages from that peer to enforce that limit.
pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);

/// Rate limit on the `sender` side.
///
/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
/// well.
///
/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));

/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem<AD> {
/// Easy and efficient runtime access for this subsystem.
Expand Down Expand Up @@ -175,6 +192,12 @@ where
ctx.spawn("disputes-receiver", receiver.run().boxed())
.map_err(FatalError::SpawnTask)?;

// Process messages for sending side.
//
// Note: We want the sender to be rate limited and we are currently taking advantage of the
// fact that the root task of this subsystem is only concerned with sending: Functions of
// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
// loop. If this fact ever changes, we will likely need another task.
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
Expand Down Expand Up @@ -250,9 +273,10 @@ impl MuxedMessage {
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
futures::select!(
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
// We select biased to make sure we finish up loose ends, before starting new work.
futures::select_biased!(
eskimor marked this conversation as resolved.
Show resolved Hide resolved
msg = from_sender.next() => MuxedMessage::Sender(msg),
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
)
}
}
Expand Down
7 changes: 5 additions & 2 deletions node/network/dispute-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ impl Metrics {
}

/// Statements have been imported.
pub fn on_imported(&self, label: &'static str) {
pub fn on_imported(&self, label: &'static str, num_requests: usize) {
if let Some(metrics) = &self.0 {
metrics.imported_requests.with_label_values(&[label]).inc()
metrics
.imported_requests
.with_label_values(&[label])
.inc_by(num_requests as u64)
}
}

Expand Down
209 changes: 209 additions & 0 deletions node/network/dispute-distribution/src/receiver/batches/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::HashMap, time::Instant};

use gum::CandidateHash;
use polkadot_node_network_protocol::{
request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
PeerId,
};
use polkadot_node_primitives::SignedDisputeStatement;
use polkadot_primitives::v2::{CandidateReceipt, ValidatorIndex};

use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};

use super::MAX_BATCH_LIFETIME;

/// A batch of votes to be imported into the `dispute-coordinator`.
///
/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
/// coordinator.
///
/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
/// it will "flush" in case the incoming rate dropped too low, preparing the import.
pub struct Batch {
/// The actual candidate this batch is concerned with.
candidate_receipt: CandidateReceipt,

/// Cache of `CandidateHash` (candidate_receipt.hash()).
candidate_hash: CandidateHash,

/// All valid votes received in this batch so far.
///
/// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
/// while still allowing validators to equivocate.
///
/// Detecting and rejecting duplicates is crucial in order to effectively enforce
/// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
/// here, the mechanism would be broken.
valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// All invalid votes received in this batch so far.
invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// How many votes have been batched since the last tick/creation.
votes_batched_since_last_tick: u32,

/// Expiry time for the batch.
///
/// By this time the latest this batch will get flushed.
best_before: Instant,

/// Requesters waiting for a response.
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
}

/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
pub(super) enum TickResult {
/// Batch is still alive, please call `tick` again at the given `Instant`.
Alive(Batch, Instant),
/// Batch is done, ready for import!
Done(PreparedImport),
}

/// Ready for import.
pub struct PreparedImport {
pub candidate_receipt: CandidateReceipt,
pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
/// Information about original requesters.
pub requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
}

impl From<Batch> for PreparedImport {
fn from(batch: Batch) -> Self {
let Batch {
candidate_receipt,
valid_votes,
invalid_votes,
requesters: pending_responses,
..
} = batch;

let statements = valid_votes
.into_iter()
.chain(invalid_votes.into_iter())
.map(|(index, statement)| (statement, index))
.collect();

Self { candidate_receipt, statements, requesters: pending_responses }
}
}

impl Batch {
/// Create a new empty batch based on the given `CandidateReceipt`.
///
/// To create a `Batch` use Batches::find_batch`.
///
/// Arguments:
///
/// * `candidate_receipt` - The candidate this batch is meant to track votes for.
/// * `now` - current time stamp for calculating the first tick.
///
/// Returns: A batch and the first `Instant` you are supposed to call `tick`.
pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
let s = Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
valid_votes: HashMap::new(),
invalid_votes: HashMap::new(),
votes_batched_since_last_tick: 0,
best_before: Instant::now() + MAX_BATCH_LIFETIME,
requesters: Vec::new(),
};
let next_tick = s.calculate_next_tick(now);
(s, next_tick)
}

/// Receipt of the candidate this batch is batching votes for.
pub fn candidate_receipt(&self) -> &CandidateReceipt {
&self.candidate_receipt
}

/// Add votes from a validator into the batch.
///
/// The statements are supposed to be the valid and invalid statements received in a
/// `DisputeRequest`.
///
/// The given `pending_response` is the corresponding response sender for responding to `peer`.
/// If at least one of the votes is new as far as this batch is concerned we record the
/// pending_response, for later use. In case both votes are known already, we return the
/// response sender as an `Err` value.
pub fn add_votes(
&mut self,
valid_vote: (SignedDisputeStatement, ValidatorIndex),
invalid_vote: (SignedDisputeStatement, ValidatorIndex),
peer: PeerId,
pending_response: OutgoingResponseSender<DisputeRequest>,
) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);

let mut duplicate = true;

if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}
if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}

if duplicate {
Err(pending_response)
} else {
self.requesters.push((peer, pending_response));
Ok(())
}
}

/// Check batch for liveness.
///
/// This function is supposed to be called at instants given at construction and as returned as
/// part of `TickResult`.
pub(super) fn tick(mut self, now: Instant) -> TickResult {
if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
now < self.best_before
{
// Still good:
let next_tick = self.calculate_next_tick(now);
// Reset counter:
self.votes_batched_since_last_tick = 0;
TickResult::Alive(self, next_tick)
} else {
TickResult::Done(PreparedImport::from(self))
}
}

/// Calculate when the next tick should happen.
///
/// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this batch
/// would exceed `MAX_BATCH_LIFETIME`.
///
/// # Arguments
///
/// * `now` - The current time.
fn calculate_next_tick(&self, now: Instant) -> Instant {
let next_tick = now + BATCH_COLLECTING_INTERVAL;
if next_tick < self.best_before {
next_tick
} else {
self.best_before
}
}
}
Loading