diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 4252de837eb8..758a978cc908 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -321,15 +321,14 @@ impl State { /// Distribute a collation. /// -/// Figure out the core our para is assigned to and the relevant validators. -/// Issue a connection request to these validators. -/// If the para is not scheduled or next up on any core, at the relay-parent, -/// or the relay-parent isn't in the active-leaves set, we ignore the message -/// as it must be invalid in that case - although this indicates a logic error -/// elsewhere in the node. +/// If the para is not scheduled on any core, at the relay parent, +/// or the relay parent isn't in our view or we already collated on the relay parent, +/// we ignore the message as it must be invalid in that case - +/// although this indicates a logic error elsewhere in the node. +/// +/// Otherwise, start advertising the collation to interested peers. async fn distribute_collation( ctx: &mut Context, - runtime: &mut RuntimeInfo, state: &mut State, id: ParaId, receipt: CandidateReceipt, @@ -358,32 +357,8 @@ where return Ok(()) } - // Determine which core the para collated-on is assigned to. - // If it is not scheduled then ignore the message. - let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { - Some(core) => core, - None => { - tracing::warn!( - target: LOG_TARGET, - para_id = %id, - ?relay_parent, - "looks like no core is assigned to {} at {}", id, relay_parent, - ); - - return Ok(()) - }, - }; - - // Determine the group on that core. - let current_validators = - determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; - - if current_validators.validators.is_empty() { - tracing::warn!( - target: LOG_TARGET, - core = ?our_core, - "there are no validators assigned to core", - ); + if !state.our_validators_groups.contains_key(&relay_parent) { + tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core."); return Ok(()) } @@ -394,16 +369,9 @@ where relay_parent = %relay_parent, candidate_hash = ?receipt.hash(), pov_hash = ?pov.hash(), - core = ?our_core, - ?current_validators, - "Accepted collation, connecting to validators." + "Accepted collation", ); - // Issue a discovery request for the validators of the current group: - connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await; - - state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); - if let Some(result_sender) = result_sender { state.collation_result_senders.insert(receipt.hash(), result_sender); } @@ -522,7 +490,7 @@ where Context: overseer::SubsystemContext, { // ignore address resolution failure - // will reissue a new request on new collation + // will reissue a new request on new relay parent let (failed, _) = oneshot::channel(); ctx.send_message(NetworkBridgeMessage::ConnectToValidators { validator_ids, @@ -633,8 +601,7 @@ where ); }, Some(id) => { - distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender) - .await?; + distribute_collation(ctx, state, id, receipt, pov, result_sender).await?; }, None => { tracing::warn!( @@ -919,7 +886,7 @@ where }, OurViewChange(view) => { tracing::trace!(target: LOG_TARGET, ?view, "Own view change"); - handle_our_view_change(state, view).await?; + handle_our_view_change(ctx, runtime, state, view).await?; }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -933,7 +900,16 @@ where } /// Handles our view changes. -async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> { +async fn handle_our_view_change( + ctx: &mut Context, + runtime: &mut RuntimeInfo, + state: &mut State, + view: OurView, +) -> Result<()> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ for removed in state.view.difference(&view) { tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed."); @@ -967,6 +943,60 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> } state.view = view; + if state.view.is_empty() { + return Ok(()) + } + + let id = match state.collating_on { + Some(id) => id, + None => return Ok(()), + }; + + // all validators assigned to the core + // across all active leaves + // this is typically our current group + // but can also include the previous group at + // rotation boundaries and considering forks + let mut group_validators = HashSet::new(); + + for relay_parent in state.view.iter().cloned() { + tracing::debug!( + target: LOG_TARGET, + ?relay_parent, + para_id = ?id, + "Processing relay parent.", + ); + + // Determine our assigned core. + // If it is not scheduled then ignore the relay parent. + let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { + Some(core) => core, + None => continue, + }; + + // Determine the group on that core. + let current_validators = + determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; + + let validators = current_validators.validators; + group_validators.extend(validators); + + state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new()); + } + + let validators: Vec<_> = group_validators.into_iter().collect(); + let no_one_is_assigned = validators.is_empty(); + if no_one_is_assigned { + tracing::warn!(target: LOG_TARGET, "No validators assigned to our core.",); + return Ok(()) + } + tracing::debug!( + target: LOG_TARGET, + ?validators, + para_id = ?id, + "Connecting to validators.", + ); + connect_to_validators(ctx, validators).await; Ok(()) } diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index 526cfab04e19..86d5639ad610 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -29,7 +29,7 @@ use sp_core::crypto::Pair; use sp_keyring::Sr25519Keyring; use sp_runtime::traits::AppVerify; -use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view}; +use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view, OurView}; use polkadot_node_primitives::BlockData; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ @@ -172,13 +172,7 @@ impl TestState { our_view![self.relay_parent] }; - overseer_send( - virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( - our_view, - )), - ) - .await; + set_our_view(virtual_overseer, &self, our_view).await; } } @@ -278,13 +272,83 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS ) .await; + set_our_view(virtual_overseer, test_state, our_view![test_state.relay_parent]).await; +} + +/// Check our view change triggers the right messages +async fn set_our_view( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + our_view: OurView, +) { overseer_send( virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( - our_view![test_state.relay_parent], + our_view.clone(), )), ) .await; + + for parent in our_view.iter().cloned() { + // obtain the availability cores. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, parent); + tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); + } + ); + + // We don't know precisely what is going to come as session info might be cached: + loop { + match overseer_recv(virtual_overseer).await { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, relay_parent); + tx.send(Ok(test_state.current_session_index())).unwrap(); + }, + + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(index, tx), + )) => { + assert_eq!(relay_parent, parent); + assert_eq!(index, test_state.current_session_index()); + + tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); + }, + + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx), + )) => { + assert_eq!(relay_parent, parent); + tx.send(Ok(( + test_state.session_info.validator_groups.clone(), + test_state.group_rotation_info.clone(), + ))) + .unwrap(); + // This call is mandatory - we are done: + break + }, + other => panic!("Unexpected message received: {:?}", other), + } + } + } + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + .. + } + ) => {} + ); } /// Result of [`distribute_collation`] @@ -297,8 +361,6 @@ struct DistributeCollation { async fn distribute_collation( virtual_overseer: &mut VirtualOverseer, test_state: &TestState, - // whether or not we expect a connection request or not. - should_connect: bool, ) -> DistributeCollation { // Now we want to distribute a `PoVBlock` let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) }; @@ -319,67 +381,6 @@ async fn distribute_collation( ) .await; - // obtain the availability cores. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); - } - ); - - // We don't know precisely what is going to come as session info might be cached: - loop { - match overseer_recv(virtual_overseer).await { - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(test_state.current_session_index())).unwrap(); - }, - - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(index, tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(index, test_state.current_session_index()); - - tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); - }, - - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx), - )) => { - assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(( - test_state.session_info.validator_groups.clone(), - test_state.group_rotation_info.clone(), - ))) - .unwrap(); - // This call is mandatory - we are done: - break - }, - other => panic!("Unexpected message received: {:?}", other), - } - } - - if should_connect { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - .. - } - ) => {} - ); - } - DistributeCollation { candidate, pov_block } } @@ -508,7 +509,7 @@ fn advertise_and_send_collation() { setup_system(&mut virtual_overseer, &test_state).await; let DistributeCollation { candidate, pov_block } = - distribute_collation(&mut virtual_overseer, &test_state, true).await; + distribute_collation(&mut virtual_overseer, &test_state).await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -625,7 +626,7 @@ fn advertise_and_send_collation() { assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); - distribute_collation(&mut virtual_overseer, &test_state, true).await; + distribute_collation(&mut virtual_overseer, &test_state).await; // Send info about peer's view. overseer_send( @@ -713,7 +714,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { // And let it tell us that it is has the same view. send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - distribute_collation(virtual_overseer, &test_state, true).await; + distribute_collation(virtual_overseer, &test_state).await; expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent).await; @@ -752,14 +753,14 @@ fn collate_on_two_different_relay_chain_blocks() { expect_declare_msg(virtual_overseer, &test_state, &peer).await; expect_declare_msg(virtual_overseer, &test_state, &peer2).await; - distribute_collation(virtual_overseer, &test_state, true).await; + distribute_collation(virtual_overseer, &test_state).await; let old_relay_parent = test_state.relay_parent; // Advance to a new round, while informing the subsystem that the old and the new relay parent are active. test_state.advance_to_new_round(virtual_overseer, true).await; - distribute_collation(virtual_overseer, &test_state, true).await; + distribute_collation(virtual_overseer, &test_state).await; send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await; expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent).await; @@ -789,7 +790,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() { connect_peer(virtual_overseer, peer.clone(), Some(validator_id.clone())).await; expect_declare_msg(virtual_overseer, &test_state, &peer).await; - distribute_collation(virtual_overseer, &test_state, true).await; + distribute_collation(virtual_overseer, &test_state).await; send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent).await; @@ -874,7 +875,7 @@ where setup_system(virtual_overseer, &test_state).await; let DistributeCollation { candidate, pov_block } = - distribute_collation(virtual_overseer, &test_state, true).await; + distribute_collation(virtual_overseer, &test_state).await; for (val, peer) in test_state .current_group_validator_authority_ids() diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 0aa53156e759..769b1448690b 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy { impl Default for CollatorEvictionPolicy { fn default() -> Self { CollatorEvictionPolicy { - inactive_collator: Duration::from_secs(24), + inactive_collator: Duration::from_secs(5), undeclared: Duration::from_secs(1), } } diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index 3d2f133163f6..7856e3fd9f96 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -75,7 +75,7 @@ impl PeerSet { max_notification_size, set_config: SetConfig { // Non-authority nodes don't need to accept incoming connections on this peer set: - in_peers: if is_authority == IsAuthority::Yes { 25 } else { 0 }, + in_peers: if is_authority == IsAuthority::Yes { 100 } else { 0 }, out_peers: 0, reserved_nodes: Vec::new(), non_reserved_mode: if is_authority == IsAuthority::Yes {