Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Try to fix out of view statements (#5177)
Browse files Browse the repository at this point in the history
This issue happens when some peer sends a good but already known Seconded statement and the statement-distribution code does not update the statements_received field in the peer_knowledge structure. Subsequently, a Valid statement causes out-of-view message that is incorrectly emitted and causes reputation lose.

This PR also introduces a concept of passing the specific pseudo-random generator to subsystems to make it easier to write deterministic tests. This functionality is not really necessary for the specific issue and unit test but it can be useful for other tests and subsystems.
  • Loading branch information
vstakhov authored Mar 24, 2022
1 parent 827792c commit 6997169
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ impl State {
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
let mut peers = entry
.known_by
.keys()
.cloned()
Expand All @@ -729,8 +729,7 @@ impl State {

let assignments = vec![(assignment, claimed_candidate_index)];
let gossip_peers = &self.gossip_peers;
let peers =
util::choose_random_subset(|e| gossip_peers.contains(e), peers, MIN_GOSSIP_PEERS);
util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS);

// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
Expand Down Expand Up @@ -943,16 +942,15 @@ impl State {
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
let mut peers = entry
.known_by
.keys()
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();

let gossip_peers = &self.gossip_peers;
let peers =
util::choose_random_subset(|e| gossip_peers.contains(e), peers, MIN_GOSSIP_PEERS);
util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS);

// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
Expand Down
6 changes: 3 additions & 3 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ async fn relay_message<Context>(

let _span = span.child("interested-peers");
// pass on the bitfield distribution to all interested peers
let interested_peers = peer_views
let mut interested_peers = peer_views
.iter()
.filter_map(|(peer, view)| {
// check interest in the peer in this message's relay parent
Expand All @@ -363,9 +363,9 @@ async fn relay_message<Context>(
}
})
.collect::<Vec<PeerId>>();
let interested_peers = util::choose_random_subset(
util::choose_random_subset(
|e| gossip_peers.contains(e),
interested_peers,
&mut interested_peers,
MIN_GOSSIP_PEERS,
);
interested_peers.iter().for_each(|peer| {
Expand Down
65 changes: 45 additions & 20 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, PeerId, UnifiedReputationChange as Rep, View,
};
use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement};
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS};

use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
Expand Down Expand Up @@ -115,16 +115,19 @@ const LOG_TARGET: &str = "parachain::statement-distribution";
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;

/// The statement distribution subsystem.
pub struct StatementDistributionSubsystem {
pub struct StatementDistributionSubsystem<R> {
/// Pointer to a keystore, which is required for determining this node's validator index.
keystore: SyncCryptoStorePtr,
/// Receiver for incoming large statement requests.
req_receiver: Option<IncomingRequestReceiver<request_v1::StatementFetchingRequest>>,
/// Prometheus metrics
metrics: Metrics,
/// Pseudo-random generator for peers selection logic
rng: R,
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for StatementDistributionSubsystem
impl<Context, R: rand::Rng + Send + Sync + 'static> overseer::Subsystem<Context, SubsystemError>
for StatementDistributionSubsystem<R>
where
Context: SubsystemContext<Message = StatementDistributionMessage>,
Context: overseer::SubsystemContext<Message = StatementDistributionMessage>,
Expand All @@ -142,17 +145,6 @@ where
}
}

impl StatementDistributionSubsystem {
/// Create a new Statement Distribution Subsystem
pub fn new(
keystore: SyncCryptoStorePtr,
req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { keystore, req_receiver: Some(req_receiver), metrics }
}
}

#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
Expand Down Expand Up @@ -906,6 +898,7 @@ async fn circulate_statement_and_dependents(
statement: SignedFullStatement,
priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
Expand All @@ -932,6 +925,7 @@ async fn circulate_statement_and_dependents(
stored,
priority_peers,
metrics,
rng,
)
.await,
)),
Expand Down Expand Up @@ -1019,6 +1013,7 @@ async fn circulate_statement<'a>(
stored: StoredStatement<'a>,
mut priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();

Expand All @@ -1041,8 +1036,12 @@ async fn circulate_statement<'a>(
let priority_set: HashSet<&PeerId> = priority_peers.iter().collect();
peers_to_send.retain(|p| !priority_set.contains(p));

let mut peers_to_send =
util::choose_random_subset(|e| gossip_peers.contains(e), peers_to_send, MIN_GOSSIP_PEERS);
util::choose_random_subset_with_rng(
|e| gossip_peers.contains(e),
&mut peers_to_send,
rng,
MIN_GOSSIP_PEERS,
);
// We don't want to use less peers, than we would without any priority peers:
let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS);
// Make set full:
Expand Down Expand Up @@ -1313,6 +1312,7 @@ async fn handle_incoming_message_and_circulate<'a>(
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let handled_incoming = match peers.get_mut(&peer) {
Some(data) =>
Expand Down Expand Up @@ -1348,6 +1348,7 @@ async fn handle_incoming_message_and_circulate<'a>(
statement,
Vec::new(),
metrics,
rng,
)
.await;
}
Expand Down Expand Up @@ -1458,7 +1459,12 @@ async fn handle_incoming_message<'a>(
Ok(()) => {},
Err(DeniedStatement::NotUseful) => return None,
Err(DeniedStatement::UsefulButKnown) => {
// Note a received statement in the peer data
peer_data
.receive(&relay_parent, &fingerprint, max_message_count)
.expect("checked in `check_can_receive` above; qed");
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;

return None
},
}
Expand Down Expand Up @@ -1558,6 +1564,7 @@ async fn update_peer_view_and_maybe_send_unlocked(
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let old_view = std::mem::replace(&mut peer_data.view, new_view);

Expand All @@ -1568,9 +1575,10 @@ async fn update_peer_view_and_maybe_send_unlocked(

let is_gossip_peer = gossip_peers.contains(&peer);
let lucky = is_gossip_peer ||
util::gen_ratio(
util::gen_ratio_rng(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
rng,
);

// Add entries for all relay-parents in the new view but not the old.
Expand All @@ -1597,6 +1605,7 @@ async fn handle_network_update(
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, role, maybe_authority) => {
Expand Down Expand Up @@ -1638,6 +1647,7 @@ async fn handle_network_update(
&*active_heads,
view,
metrics,
rng,
)
.await
}
Expand All @@ -1654,6 +1664,7 @@ async fn handle_network_update(
message,
req_sender,
metrics,
rng,
)
.await;
},
Expand All @@ -1670,6 +1681,7 @@ async fn handle_network_update(
&*active_heads,
view,
metrics,
rng,
)
.await,
None => (),
Expand All @@ -1681,7 +1693,17 @@ async fn handle_network_update(
}
}

impl StatementDistributionSubsystem {
impl<R: rand::Rng> StatementDistributionSubsystem<R> {
/// Create a new Statement Distribution Subsystem
pub fn new(
keystore: SyncCryptoStorePtr,
req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
metrics: Metrics,
rng: R,
) -> Self {
Self { keystore, req_receiver: Some(req_receiver), metrics, rng }
}

async fn run(
mut self,
mut ctx: (impl SubsystemContext<Message = StatementDistributionMessage>
Expand Down Expand Up @@ -1803,7 +1825,7 @@ impl StatementDistributionSubsystem {
}

async fn handle_requester_message(
&self,
&mut self,
ctx: &mut impl SubsystemContext,
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
Expand Down Expand Up @@ -1861,6 +1883,7 @@ impl StatementDistributionSubsystem {
message,
req_sender,
&self.metrics,
&mut self.rng,
)
.await;
}
Expand Down Expand Up @@ -1910,7 +1933,7 @@ impl StatementDistributionSubsystem {
}

async fn handle_subsystem_message(
&self,
&mut self,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
runtime: &mut RuntimeInfo,
peers: &mut HashMap<PeerId, PeerData>,
Expand Down Expand Up @@ -2022,6 +2045,7 @@ impl StatementDistributionSubsystem {
statement,
group_peers,
metrics,
&mut self.rng,
)
.await;
},
Expand All @@ -2036,6 +2060,7 @@ impl StatementDistributionSubsystem {
req_sender,
event,
metrics,
&mut self.rng,
)
.await;
},
Expand Down
Loading

0 comments on commit 6997169

Please sign in to comment.