Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Batch vote import in dispute-distribution #5894

Merged
merged 45 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b202cc8
Start work on batching in dispute-distribution.
eskimor Aug 16, 2022
192d73b
Guide work.
eskimor Aug 17, 2022
a2f8fde
More guide changes. Still very much WIP.
eskimor Aug 18, 2022
5e2f4a5
Finish guide changes.
eskimor Aug 22, 2022
585ec6d
Clarification
eskimor Aug 22, 2022
7e02910
Adjust argument about slashing.
eskimor Aug 23, 2022
4e170e2
WIP: Add constants to receiver.
eskimor Aug 23, 2022
362ebae
Maintain order of disputes.
eskimor Aug 24, 2022
68c7073
dispute-distribuion sender Rate limit.
eskimor Aug 24, 2022
56519a7
Cleanup
eskimor Aug 25, 2022
b3ab280
WIP: dispute-distribution receiver.
eskimor Aug 25, 2022
4b4df00
WIP: Batching.
eskimor Aug 25, 2022
5816c8d
fmt
eskimor Aug 25, 2022
a821efc
Update `PeerQueues` to maintain more invariants.
eskimor Aug 26, 2022
3b71817
WIP: Batching.
eskimor Aug 26, 2022
909569b
Small cleanup
eskimor Aug 26, 2022
fa14c43
Batching logic.
eskimor Aug 30, 2022
89d622d
Some integration work.
eskimor Aug 30, 2022
401ffb8
Finish.
eskimor Aug 31, 2022
3dd2041
Typo.
eskimor Sep 1, 2022
f9fc6c8
Docs.
eskimor Sep 1, 2022
43d946d
Report missing metric.
eskimor Sep 1, 2022
3e5ceaa
Doc pass.
eskimor Sep 1, 2022
0e11f4f
Tests for waiting_queue.
eskimor Sep 2, 2022
2fcebad
Speed up some crypto by 10x.
eskimor Sep 2, 2022
3d444ae
Fix redundant import.
eskimor Sep 2, 2022
e7923a7
Add some tracing.
eskimor Sep 2, 2022
8b8f722
Better sender rate limit
eskimor Sep 2, 2022
a79a96d
Some tests.
eskimor Sep 2, 2022
c1ed615
Tests
eskimor Sep 5, 2022
f4c530c
Add logging to rate limiter
eskimor Sep 5, 2022
795c6c1
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
d46be76
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
9b8787d
Update node/network/dispute-distribution/src/receiver/mod.rs
eskimor Sep 8, 2022
9795098
Review feedback.
eskimor Sep 9, 2022
465f266
Also log peer in log messages.
eskimor Sep 21, 2022
88c61d2
Fix indentation.
eskimor Sep 28, 2022
78066ac
waker -> timer
eskimor Sep 28, 2022
3e43b0a
Guide improvement.
eskimor Sep 28, 2022
f0e5001
Remove obsolete comment.
eskimor Sep 28, 2022
30d10f4
waker -> timer
eskimor Oct 4, 2022
8ef4a24
Fix spell complaints.
eskimor Oct 4, 2022
5b5d82d
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
f0a054b
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
35d698c
Fix Cargo.lock
eskimor Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node/network/dispute-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub type Result<T> = std::result::Result<T, Error>;

pub type FatalResult<T> = std::result::Result<T, FatalError>;

pub type JfyiResult<T> = std::result::Result<T, JfyiError>;

/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
Expand Down
6 changes: 5 additions & 1 deletion node/network/dispute-distribution/src/receiver/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use fatality::Nested;

use polkadot_node_network_protocol::{request_response::incoming, PeerId};
use polkadot_node_subsystem_util::runtime;
use polkadot_primitives::v2::AuthorityDiscoveryId;

use crate::LOG_TARGET;

Expand Down Expand Up @@ -49,11 +50,14 @@ pub enum Error {

#[error("Peer {0} attempted to participate in dispute and is not a validator.")]
NotAValidator(PeerId),

#[error("Authority {0} sent messages at a too high rate.")]
AuthorityFlooding(AuthorityDiscoveryId),
}

pub type Result<T> = std::result::Result<T, Error>;

pub type JfyiErrorResult<T> = std::result::Result<T, JfyiError>;
pub type JfyiResult<T> = std::result::Result<T, JfyiError>;

/// Utility for eating top level errors and log them.
///
Expand Down
185 changes: 127 additions & 58 deletions node/network/dispute-distribution/src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
collections::HashSet,
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{
channel::oneshot,
future::{poll_fn, BoxFuture},
pin_mut,
future::{poll_fn, BoxFuture, Fuse},
pin_mut, select_biased,
stream::{FusedStream, FuturesUnordered, StreamExt},
Future, FutureExt, Stream,
};
use futures_timer::Delay;
use lru::LruCache;

use polkadot_node_network_protocol::{
Expand All @@ -44,19 +46,32 @@ use polkadot_node_subsystem::{
overseer,
};
use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
use polkadot_primitives::v2::AuthorityDiscoveryId;

use crate::{
metrics::{FAILED, SUCCEEDED},
Metrics, LOG_TARGET,
Metrics, LOG_TARGET, RECEIVE_RATE_LIMIT,
};

mod error;
use self::error::{log_error, JfyiError, JfyiErrorResult, Result};

/// Queues for incoming requests by peers.
mod peer_queues;

use self::{
error::{log_error, JfyiError, JfyiResult, Result},
peer_queues::PeerQueues,
};

const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
const COST_INVALID_CANDIDATE: Rep = Rep::Malicious("Reported candidate was not available.");
const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
/// Mildly punish peers exceeding their rate limit.
///
/// For honest peers this should rarely happen, but if it happens we would not want to disconnect
/// too quickly. Minor cost should suffice for disconnecting any real flooder.
const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Peer exceeded the rate limit.");

/// How many votes must have arrived in the last `BATCH_COLLECTING_INTERVAL`
///
Expand Down Expand Up @@ -84,19 +99,18 @@ pub struct DisputesReceiver<Sender, AD> {
/// Channel to retrieve incoming requests from.
receiver: IncomingRequestReceiver<DisputeRequest>,

/// Rate limiting queue for each peer (only authorities).
peer_queues: PeerQueues,

/// Delay timer for establishing the rate limit.
rate_limit: Fuse<Delay>,

/// Authority discovery service:
authority_discovery: AD,

/// Imports currently being processed.
pending_imports: PendingImports,

/// We keep record of the last banned peers.
///
/// This is needed because once we ban a peer, we will very likely still have pending requests
/// in the incoming channel - we should not waste time recovering availability for those, as we
/// already know the peer is malicious.
banned_peers: LruCache<PeerId, ()>,

/// Log received requests.
metrics: Metrics,
}
Expand All @@ -110,7 +124,7 @@ enum MuxedMessage {
/// - We need to make sure responses are actually sent (therefore we need to await futures
/// promptly).
/// - We need to update `banned_peers` accordingly to the result.
ConfirmedImport(JfyiErrorResult<(PeerId, ImportStatementsResult)>),
ConfirmedImport(JfyiResult<(PeerId, ImportStatementsResult)>),

/// A new request has arrived and should be handled.
NewRequest(IncomingRequest<DisputeRequest>),
Expand All @@ -122,6 +136,11 @@ impl MuxedMessage {
pending_requests: &mut IncomingRequestReceiver<DisputeRequest>,
) -> Result<MuxedMessage> {
poll_fn(|ctx| {
// In case of Ready(None), we want to wait for pending requests:
if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) {
return Poll::Ready(Ok(Self::ConfirmedImport(v)))
}

let next_req = pending_requests.recv(|| vec![COST_INVALID_REQUEST]);
pin_mut!(next_req);
if let Poll::Ready(r) = next_req.poll(ctx) {
Expand All @@ -130,11 +149,6 @@ impl MuxedMessage {
Ok(v) => Poll::Ready(Ok(Self::NewRequest(v))),
}
}
// In case of Ready(None) return `Pending` below - we want to wait for the next request
// in that case.
if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) {
return Poll::Ready(Ok(Self::ConfirmedImport(v)))
}
Poll::Pending
})
.await
Expand All @@ -161,11 +175,12 @@ where
runtime,
sender,
receiver,
peer_queues: PeerQueues::new(),
rate_limit: Delay::new(RECEIVE_RATE_LIMIT).fuse(),
authority_discovery,
pending_imports: PendingImports::new(),
// Size of MAX_PARALLEL_IMPORTS ensures we are going to immediately get rid of any
// malicious requests still pending in the incoming queue.
banned_peers: LruCache::new(MAX_PARALLEL_IMPORTS),
metrics,
}
}
Expand All @@ -191,45 +206,35 @@ where

/// Actual work happening here.
async fn run_inner(&mut self) -> Result<()> {
let msg = MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).await?;

let incoming = match msg {
// We need to clean up futures, to make sure responses are sent:
MuxedMessage::ConfirmedImport(m_bad) => {
self.ban_bad_peer(m_bad)?;
return Ok(())
},
MuxedMessage::NewRequest(req) => req,
let msg = if self.peer_queues.is_empty() {
// No point to wake on timeout:
Some(MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).await?)
} else {
self.wait_for_message_or_timeout().await?
};

self.metrics.on_received_request();

let peer = incoming.peer;

// Only accept messages from validators:
if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() {
incoming
.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_NOT_A_VALIDATOR],
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponse(peer))?;

return Err(JfyiError::NotAValidator(peer).into())
}
if let Some(msg) = msg {
let incoming = match msg {
// We need to clean up futures, to make sure responses are sent:
MuxedMessage::ConfirmedImport(m_bad) => {
eskimor marked this conversation as resolved.
Show resolved Hide resolved
self.ban_bad_peer(m_bad)?;
return Ok(())
},
MuxedMessage::NewRequest(req) => req,
};

// Immediately drop requests from peers that already have requests in flight or have
// been banned recently (flood protection):
if self.pending_imports.peer_is_pending(&peer) || self.banned_peers.contains(&peer) {
gum::trace!(
target: LOG_TARGET,
?peer,
"Dropping message from peer (banned/pending import)"
);
self.metrics.on_received_request();
self.dispatch_to_queues(incoming).await?;
// Wait for more messages:
return Ok(())
}

// Let's actually process messages, that made it through the rate limit:
//
// Batch:
// - Collect votes - get rid of duplicates.
// - Keep track of import rate.
// - Flush if import rate is not matched
// Wait for a free slot:
if self.pending_imports.len() >= MAX_PARALLEL_IMPORTS as usize {
// Wait for one to finish:
Expand All @@ -241,6 +246,71 @@ where
self.start_import(incoming).await
}

/// Wait for a message or the `rate_limit` timeout to hit (if there is one).
///
/// In case a message got received `rate_limit` will be populated by this function. This way we
/// only wake on timeouts if there are actually any messages to process.
///
/// In case of timeout we return Ok(None).
async fn wait_for_message_or_timeout(&mut self) -> Result<Option<MuxedMessage>> {
// We already have messages to process - rate limiting activated:
let rcv_msg = MuxedMessage::receive(&mut self.pending_imports, &mut self.receiver).fuse();
pin_mut!(rcv_msg);
let mut timeout = Pin::new(&mut self.rate_limit);
let result = select_biased!(
() = timeout => None,
msg = rcv_msg => Some(msg?),
);
if result.is_none() {
// Timeout hit - we need a new Delay (started immediately so the following processing
// does not further decrease allowed rate (assuming processing takes less than
// `RECEIVE_RATE_LIMIT`):
self.rate_limit = Delay::new(RECEIVE_RATE_LIMIT).fuse();
}
Ok(result)
}

/// Process incoming requests.
///
/// - Check sender is authority
/// - Dispatch message to corresponding queue in `peer_queues`.
/// - If queue is full, drop message and change reputation of sender.
async fn dispatch_to_queues(&mut self, req: IncomingRequest<DisputeRequest>) -> JfyiResult<()> {
let peer = req.peer;
// Only accept messages from validators, in case there are multiple `AuthorityId`s, we
// just take the first one. On session boundaries this might allow validators to double
// their rate limit for a short period of time, which seems acceptable.
let authority_id = match self
.authority_discovery
.get_authority_ids_by_peer_id(peer)
.await
.and_then(|s| s.into_iter().next())
{
None => {
req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_NOT_A_VALIDATOR],
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponse(peer))?;
return Err(JfyiError::NotAValidator(peer).into())
},
Some(auth_id) => auth_id,
};

// Queue request:
if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_APPARENT_FLOOD],
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponse(peer))?;
return Err(JfyiError::AuthorityFlooding(authority_id))
}
Ok(())
}

/// Start importing votes for the given request.
async fn start_import(&mut self, incoming: IncomingRequest<DisputeRequest>) -> Result<()> {
let IncomingRequest { peer, payload, pending_response } = incoming;
Expand Down Expand Up @@ -291,8 +361,8 @@ where
/// In addition we report import metrics.
fn ban_bad_peer(
&mut self,
result: JfyiErrorResult<(PeerId, ImportStatementsResult)>,
) -> JfyiErrorResult<()> {
result: JfyiResult<(PeerId, ImportStatementsResult)>,
) -> JfyiResult<()> {
match result? {
(_, ImportStatementsResult::ValidImport) => {
self.metrics.on_imported(SUCCEEDED);
Expand All @@ -309,8 +379,7 @@ where
/// Manage pending imports in a way that preserves invariants.
struct PendingImports {
/// Futures in flight.
futures:
FuturesUnordered<BoxFuture<'static, (PeerId, JfyiErrorResult<ImportStatementsResult>)>>,
futures: FuturesUnordered<BoxFuture<'static, (PeerId, JfyiResult<ImportStatementsResult>)>>,
/// Peers whose requests are currently in flight.
peers: HashSet<PeerId>,
}
Expand Down Expand Up @@ -348,7 +417,7 @@ impl PendingImports {
}

impl Stream for PendingImports {
type Item = JfyiErrorResult<(PeerId, ImportStatementsResult)>;
type Item = JfyiResult<(PeerId, ImportStatementsResult)>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.futures).poll_next(ctx) {
Poll::Pending => Poll::Pending,
Expand All @@ -375,7 +444,7 @@ async fn respond_to_request(
peer: PeerId,
handled: oneshot::Receiver<ImportStatementsResult>,
pending_response: OutgoingResponseSender<DisputeRequest>,
) -> JfyiErrorResult<ImportStatementsResult> {
) -> JfyiResult<ImportStatementsResult> {
let result = handled.await.map_err(|_| JfyiError::ImportCanceled(peer))?;

let response = match result {
Expand Down
Loading