From 5f694a424e5d8af9e177713d273f77714d54bcc7 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Thu, 8 Feb 2024 12:41:03 +0200 Subject: [PATCH 1/8] fixup! approval-distribution: Update topology if authorities are discovered later (#2981) The previous fix was actually incomplete because we update the authorties only on the situation where we decided to reconnect because we had a low connectivity issue. Now the problem is that update_authority_ids use the list of connected peers, so on restart that does contain anything, so calling immediately after issue_connection_request won't detect all authorties, so we need to also check every block as the comment said, but that did not match the code. --- .../node/network/gossip-support/src/lib.rs | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index e9cb8a4de1c4..822039939a32 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -271,8 +271,30 @@ where ) .await?; } - // authority_discovery is just a cache so let's try every leaf to detect if there - // are new authorities there. + // authority_discovery is just a cache so let's try every time we try to re-connect + // if new authorities are present. + self.update_authority_ids(sender, session_info.discovery_keys).await; + } else if let Some(session_index) = self + .last_session_index + .filter(|last_session_index| *last_session_index >= current_index) + { + // authority_discovery is just a cache so let's try every leaf from the current + // session if there are new authorities detected and inform the needed subsystems. + let session_info = + util::request_session_info(leaf, session_index, sender).await.await??; + + let session_info = match session_info { + Some(s) => s, + None => { + gum::warn!( + relay_parent = ?leaf, + session_index = self.last_session_index, + "Failed to get session info.", + ); + + continue + }, + }; self.update_authority_ids(sender, session_info.discovery_keys).await; } } From 28561d5f0973debb9d4df196174a422dee270927 Mon Sep 17 00:00:00 2001 From: Alexander Samusev <41779041+alvicsam@users.noreply.github.com> Date: Thu, 8 Feb 2024 10:30:48 +0100 Subject: [PATCH 2/8] [ci] Remove path from check-workspace GHA trigger (#3255) In order to make the action `Required` it should run always. cc @ggwpez --- .github/workflows/check-workspace.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/check-workspace.yml b/.github/workflows/check-workspace.yml index 3dd812d7d9b3..81ec311ccce8 100644 --- a/.github/workflows/check-workspace.yml +++ b/.github/workflows/check-workspace.yml @@ -2,8 +2,6 @@ name: Check workspace on: pull_request: - paths: - - "*.toml" merge_group: jobs: @@ -19,5 +17,5 @@ jobs: run: > python3 .github/scripts/check-workspace.py . --exclude - "substrate/frame/contracts/fixtures/build" + "substrate/frame/contracts/fixtures/build" "substrate/frame/contracts/fixtures/contracts/common" From 0e383940bd68a236d90c8cb5b07a168b26906816 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Thu, 8 Feb 2024 16:46:47 +0200 Subject: [PATCH 3/8] Add unitest for update_authorities Signed-off-by: Alexandru Gheorghe --- Cargo.lock | 1 + .../node/network/gossip-support/Cargo.toml | 1 + .../node/network/gossip-support/src/lib.rs | 2 +- .../node/network/gossip-support/src/tests.rs | 615 +++++++++++++----- 4 files changed, 440 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27a1efc38098..0300ed93e6dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12232,6 +12232,7 @@ dependencies = [ "futures", "futures-timer", "lazy_static", + "parking_lot 0.12.1", "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index c17f39b019de..8d0edc206d72 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -38,5 +38,6 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" async-trait = "0.1.74" +parking_lot = "0.12.1" lazy_static = "1.4.0" quickcheck = "1.0.3" diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 822039939a32..f60d9e906b44 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -276,7 +276,7 @@ where self.update_authority_ids(sender, session_info.discovery_keys).await; } else if let Some(session_index) = self .last_session_index - .filter(|last_session_index| *last_session_index >= current_index) + .filter(|last_session_index| *last_session_index == current_index) { // authority_discovery is just a cache so let's try every leaf from the current // session if there are new authorities detected and inform the needed subsystems. diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index e5ee101c31d8..412dd77b96f9 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -25,13 +25,19 @@ use lazy_static::lazy_static; use quickcheck::quickcheck; use rand::seq::SliceRandom as _; +use parking_lot::Mutex; use sc_network::multiaddr::Protocol; use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch}; use sp_core::crypto::Pair as PairT; use sp_keyring::Sr25519Keyring; +use std::sync::Arc; -use polkadot_node_network_protocol::grid_topology::{SessionGridTopology, TopologyPeerInfo}; +use polkadot_node_network_protocol::{ + grid_topology::{SessionGridTopology, TopologyPeerInfo}, + peer_set::ValidationVersion, + ObservedRole, +}; use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; @@ -51,7 +57,6 @@ const AUTHORITY_KEYRINGS: &[Sr25519Keyring] = &[ ]; lazy_static! { - static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(); static ref AUTHORITIES: Vec = AUTHORITY_KEYRINGS.iter().map(|k| k.public().into()).collect(); @@ -67,6 +72,7 @@ lazy_static! { .chain(AUTHORITIES.clone()) .collect() }; + // static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); // [2 6] // [4 5] @@ -89,17 +95,14 @@ type VirtualOverseer = test_helpers::TestSubsystemContextHandle>, - authorities: HashMap>, + addrs: Arc>>>, + authorities: Arc>>>, } impl MockAuthorityDiscovery { - fn new() -> Self { - let authorities: HashMap<_, _> = PAST_PRESENT_FUTURE_AUTHORITIES - .clone() - .into_iter() - .map(|a| (PeerId::random(), a)) - .collect(); + fn new(authorities: Vec) -> Self { + let authorities: HashMap<_, _> = + authorities.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); let addrs = authorities .clone() .into_iter() @@ -109,10 +112,37 @@ impl MockAuthorityDiscovery { }) .collect(); Self { - addrs, - authorities: authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(), + addrs: Arc::new(Mutex::new(addrs)), + authorities: Arc::new(Mutex::new( + authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(), + )), } } + + fn authorities(&self) -> HashMap> { + self.authorities.lock().clone() + } + + fn add_more_authorties( + &mut self, + new_known: Vec, + ) -> HashMap> { + let authorities: HashMap<_, _> = + new_known.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); + let addrs: HashMap> = authorities + .clone() + .into_iter() + .map(|(p, a)| { + let multiaddr = Multiaddr::empty().with(Protocol::P2p(p.into())); + (a, HashSet::from([multiaddr])) + }) + .collect(); + let authorities: HashMap> = + authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(); + self.addrs.as_ref().lock().extend(addrs); + self.authorities.as_ref().lock().extend(authorities.clone()); + authorities + } } #[async_trait] @@ -121,19 +151,23 @@ impl AuthorityDiscovery for MockAuthorityDiscovery { &mut self, authority: polkadot_primitives::AuthorityDiscoveryId, ) -> Option> { - self.addrs.get(&authority).cloned() + self.addrs.lock().get(&authority).cloned() } + async fn get_authority_ids_by_peer_id( &mut self, peer_id: polkadot_node_network_protocol::PeerId, ) -> Option> { - self.authorities.get(&peer_id).cloned() + self.authorities.as_ref().lock().get(&peer_id).cloned() } } -async fn get_multiaddrs(authorities: Vec) -> Vec> { +async fn get_multiaddrs( + authorities: Vec, + mock_authority_discovery: MockAuthorityDiscovery, +) -> Vec> { let mut addrs = Vec::with_capacity(authorities.len()); - let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + let mut discovery = mock_authority_discovery.clone(); for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority).await { addrs.push(addr); @@ -144,9 +178,10 @@ async fn get_multiaddrs(authorities: Vec) -> Vec, + mock_authority_discovery: MockAuthorityDiscovery, ) -> HashMap> { let mut addrs = HashMap::with_capacity(authorities.len()); - let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + let mut discovery = mock_authority_discovery.clone(); for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority.clone()).await { addrs.insert(authority, addr); @@ -155,12 +190,10 @@ async fn get_address_map( addrs } -fn make_subsystem() -> GossipSupport { - GossipSupport::new( - make_ferdie_keystore(), - MOCK_AUTHORITY_DISCOVERY.clone(), - Metrics::new_dummy(), - ) +fn make_subsystem_with_authority_discovery( + mock: MockAuthorityDiscovery, +) -> GossipSupport { + GossipSupport::new(make_ferdie_keystore(), mock, Metrics::new_dummy()) } fn test_harness, AD: AuthorityDiscovery>( @@ -291,59 +324,65 @@ async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: Sessio #[test] fn issues_a_connection_request_on_new_session() { + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + let mock_authority_discovery_clone = mock_authority_discovery.clone(); let hash = Hash::repeat_byte(0xAA); - let state = test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); - } - ); + let state = test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(AUTHORITIES.clone())).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); - assert_eq!(peer_set, PeerSet::Validation); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery_clone).await); + assert_eq!(peer_set, PeerSet::Validation); + } + ); - test_neighbors(overseer, 1).await; + test_neighbors(overseer, 1).await; - virtual_overseer - }); + virtual_overseer + }, + ); assert_eq!(state.last_session_index, Some(1)); assert!(state.last_failure.is_none()); @@ -363,6 +402,17 @@ fn issues_a_connection_request_on_new_session() { tx.send(Ok(1)).unwrap(); } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); virtual_overseer }); @@ -414,7 +464,7 @@ fn issues_a_connection_request_on_new_session() { validator_addrs, peer_set, }) => { - assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery.clone()).await); assert_eq!(peer_set, PeerSet::Validation); } ); @@ -430,125 +480,332 @@ fn issues_a_connection_request_on_new_session() { #[test] fn issues_connection_request_to_past_present_future() { let hash = Hash::repeat_byte(0xAA); - test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); - } - ); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES - .iter() - .cloned() - .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) - .collect(); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); - let addrs = get_multiaddrs(all_without_ferdie).await; + let addrs = get_multiaddrs(all_without_ferdie, mock_authority_discovery.clone()).await; - assert_eq!(validator_addrs, addrs); - assert_eq!(peer_set, PeerSet::Validation); - } - ); + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); - // Ensure neighbors are unaffected - test_neighbors(overseer, 1).await; + // Ensure neighbors are unaffected + test_neighbors(overseer, 1).await; - virtual_overseer - }); + virtual_overseer + }, + ); } +// Test we notify peer about learning of the authority ID at session boundary #[test] -fn disconnect_when_not_in_past_present_future() { - sp_tracing::try_init_simple(); +fn issues_update_authorities_after_session() { let hash = Hash::repeat_byte(0xAA); - test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); + + let mut authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + let unknown_at_session = authorities.split_off(authorities.len() - 20); + let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities); + + test_harness( + make_subsystem_with_authority_discovery(authority_discovery_mock.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // 1. Initialize with the first leaf in the session. + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, authority_discovery_mock.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + test_neighbors(overseer, 1).await; + + // 2. Connect all authorities that are known so far. + let known_authorities = authority_discovery_mock.authorities(); + for (peer_id, _id) in known_authorities.iter() { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer + .send(FromOrchestra::Communication { msg }) + .timeout(Duration::from_secs(4)) + .await + .expect("msg send timeout"); } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - let mut heute_leider_nicht = make_session_info(); - heute_leider_nicht.discovery_keys = AUTHORITIES_WITHOUT_US.clone(); - tx.send(Ok(Some(heute_leider_nicht))).unwrap(); + // 3. Send a new leaf and check UpdateAuthority is emitted for all known connected + // peers. + let hash = Hash::repeat_byte(0xBB); + overseer_signal_active_leaves(overseer, hash).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + for _ in 0..known_authorities.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap_or_default(), authority_ids); + } + ); } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(AUTHORITIES_WITHOUT_US.clone())).unwrap(); + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + + // 4. Connect a few more authorities. + let newly_added = authority_discovery_mock.add_more_authorties(unknown_at_session); + for (peer_id, _) in newly_added.iter() { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer + .send(FromOrchestra::Communication { msg }) + .timeout(Duration::from_secs(4)) + .await + .expect("msg send timeout"); } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - assert!(validator_addrs.is_empty()); - assert_eq!(peer_set, PeerSet::Validation); + // 5. Send a new leaf and check UpdateAuthority is emitted only for newly connected + // peers. + let hash = Hash::repeat_byte(0xCC); + overseer_signal_active_leaves(overseer, hash).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + } + ); + + for _ in 0..newly_added.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(newly_added.get(&peer_id).cloned().unwrap_or_default(), authority_ids); + } + ); } - ); + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + virtual_overseer + }, + ); +} - virtual_overseer - }); +#[test] +fn disconnect_when_not_in_past_present_future() { + sp_tracing::try_init_simple(); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + let hash = Hash::repeat_byte(0xAA); + test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut heute_leider_nicht = make_session_info(); + heute_leider_nicht.discovery_keys = AUTHORITIES_WITHOUT_US.clone(); + tx.send(Ok(Some(heute_leider_nicht))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(AUTHORITIES_WITHOUT_US.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + assert!(validator_addrs.is_empty()); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + virtual_overseer + }, + ); } #[test] @@ -579,13 +836,15 @@ fn test_log_output() { #[test] fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { let hash = Hash::repeat_byte(0xAA); - let mut state = make_subsystem(); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + let state = make_subsystem_with_authority_discovery(mock_authority_discovery.clone()); // There will be two lookup failures: let alice = Sr25519Keyring::Alice.public().into(); let bob = Sr25519Keyring::Bob.public().into(); - let alice_addr = state.authority_discovery.addrs.remove(&alice); - state.authority_discovery.addrs.remove(&bob); - + let alice_addr = state.authority_discovery.addrs.lock().remove(&alice); + state.authority_discovery.addrs.lock().remove(&bob); + let mock_authority_discovery_clone = mock_authority_discovery.clone(); let mut state = { let alice = alice.clone(); let bob = bob.clone(); @@ -633,7 +892,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery_clone.clone()).await; expected.remove(&alice); expected.remove(&bob); let expected: HashSet = expected.into_values().flat_map(|v| v.into_iter()).collect(); @@ -652,7 +911,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert!(state.last_failure.is_some()); state.last_failure = state.last_failure.and_then(|i| i.checked_sub(BACKOFF_DURATION)); // One error less: - state.authority_discovery.addrs.insert(alice, alice_addr.unwrap()); + state.authority_discovery.addrs.lock().insert(alice, alice_addr.unwrap()); let hash = Hash::repeat_byte(0xBB); let state = test_harness(state, |mut virtual_overseer| async move { @@ -698,7 +957,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery.clone()).await; expected.remove(&bob); let expected: HashSet = expected.into_values().flat_map(|v| v.into_iter()).collect(); assert_eq!(validator_addrs.into_iter().flat_map(|v| v.into_iter()).collect::>(), expected); From b473dd83717b54fd6244ab9eefb1d9054e4ac094 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Thu, 8 Feb 2024 17:03:13 +0200 Subject: [PATCH 4/8] Fixup comment Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/gossip-support/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index 412dd77b96f9..eaf27690e759 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -548,7 +548,7 @@ fn issues_connection_request_to_past_present_future() { ); } -// Test we notify peer about learning of the authority ID at session boundary +// Test we notify peer about learning of the authority ID after session boundary #[test] fn issues_update_authorities_after_session() { let hash = Hash::repeat_byte(0xAA); From 6fe3f8ccb4558b35fe458a955d62d4311dd45657 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Fri, 9 Feb 2024 09:46:38 +0200 Subject: [PATCH 5/8] Remove unneeded fix Signed-off-by: Alexandru Gheorghe --- .../node/network/gossip-support/src/lib.rs | 22 ---- .../node/network/gossip-support/src/tests.rs | 115 ++++++++++++++---- 2 files changed, 93 insertions(+), 44 deletions(-) diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index f60d9e906b44..6a1c34f71375 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -274,28 +274,6 @@ where // authority_discovery is just a cache so let's try every time we try to re-connect // if new authorities are present. self.update_authority_ids(sender, session_info.discovery_keys).await; - } else if let Some(session_index) = self - .last_session_index - .filter(|last_session_index| *last_session_index == current_index) - { - // authority_discovery is just a cache so let's try every leaf from the current - // session if there are new authorities detected and inform the needed subsystems. - let session_info = - util::request_session_info(leaf, session_index, sender).await.await??; - - let session_info = match session_info { - Some(s) => s, - None => { - gum::warn!( - relay_parent = ?leaf, - session_index = self.last_session_index, - "Failed to get session info.", - ); - - continue - }, - }; - self.update_authority_ids(sender, session_info.discovery_keys).await; } } Ok(()) diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index eaf27690e759..2803fd08749f 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -402,17 +402,7 @@ fn issues_a_connection_request_on_new_session() { tx.send(Ok(1)).unwrap(); } ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); - } - ); + virtual_overseer }); @@ -548,13 +538,14 @@ fn issues_connection_request_to_past_present_future() { ); } -// Test we notify peer about learning of the authority ID after session boundary +// Test we notify peer about learning of the authority ID after session boundary, when we couldn't +// connect to more than 1/3 of the authorities. #[test] fn issues_update_authorities_after_session() { let hash = Hash::repeat_byte(0xAA); let mut authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); - let unknown_at_session = authorities.split_off(authorities.len() - 20); + let unknown_at_session = authorities.split_off(authorities.len() / 3 - 1); let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities); test_harness( @@ -582,7 +573,9 @@ fn issues_update_authorities_after_session() { )) => { assert_eq!(relay_parent, hash); assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); } ); @@ -617,7 +610,37 @@ fn issues_update_authorities_after_session() { ); // Ensure neighbors are unaffected - test_neighbors(overseer, 1).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::CurrentBabeEpoch(tx), + )) => { + let _ = tx.send(Ok(BabeEpoch { + epoch_index: 2 as _, + start_slot: 0.into(), + duration: 200, + authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)], + randomness: [0u8; 32], + config: BabeEpochConfiguration { + c: (1, 4), + allowed_slots: AllowedSlots::PrimarySlots, + }, + })).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { + session: _, + local_index: _, + canonical_shuffling: _, + shuffled_indices: _, + }) => { + + } + ); // 2. Connect all authorities that are known so far. let known_authorities = authority_discovery_mock.authorities(); @@ -636,8 +659,9 @@ fn issues_update_authorities_after_session() { .expect("msg send timeout"); } - // 3. Send a new leaf and check UpdateAuthority is emitted for all known connected - // peers. + Delay::new(BACKOFF_DURATION).await; + // 3. Send a new leaf after BACKOFF_DURATION and check UpdateAuthority is emitted for + // all known connected peers. let hash = Hash::repeat_byte(0xBB); overseer_signal_active_leaves(overseer, hash).await; @@ -667,6 +691,26 @@ fn issues_update_authorities_after_session() { } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs: _, + peer_set: _, + }) => { + } + ); + for _ in 0..known_authorities.len() { assert_matches!( overseer_recv(overseer).await, @@ -680,10 +724,14 @@ fn issues_update_authorities_after_session() { } assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); - - // 4. Connect a few more authorities. + // 4. Connect more authorities except one let newly_added = authority_discovery_mock.add_more_authorties(unknown_at_session); - for (peer_id, _) in newly_added.iter() { + let mut newly_added_iter = newly_added.iter(); + let unconnected_at_last_retry = newly_added_iter + .next() + .map(|(peer_id, authority_id)| (*peer_id, authority_id.clone())) + .unwrap(); + for (peer_id, _) in newly_added_iter { let msg = GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( *peer_id, @@ -698,9 +746,10 @@ fn issues_update_authorities_after_session() { .expect("msg send timeout"); } - // 5. Send a new leaf and check UpdateAuthority is emitted only for newly connected + // 5. Send a new leaf and check UpdateAuthority is emitted only for the newly connected // peers. let hash = Hash::repeat_byte(0xCC); + Delay::new(BACKOFF_DURATION).await; overseer_signal_active_leaves(overseer, hash).await; assert_matches!( @@ -728,17 +777,39 @@ fn issues_update_authorities_after_session() { } ); - for _ in 0..newly_added.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs: _, + peer_set: _, + }) => { + } + ); + + for _ in 1..newly_added.len() { assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids, }) => { + assert_ne!(peer_id, unconnected_at_last_retry.0); assert_eq!(newly_added.get(&peer_id).cloned().unwrap_or_default(), authority_ids); } ); } + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); virtual_overseer }, From e4f3ccf026bfb6b6df0268fc51531c83f4e63af3 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 12 Feb 2024 12:58:43 +0200 Subject: [PATCH 6/8] Fixup review comments Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/gossip-support/src/lib.rs | 4 ++++ polkadot/node/network/gossip-support/src/tests.rs | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 6a1c34f71375..4dfdd1f7208f 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -63,8 +63,12 @@ use metrics::Metrics; const LOG_TARGET: &str = "parachain::gossip-support"; // How much time should we wait to reissue a connection request // since the last authority discovery resolution failure. +#[cfg(not(test))] const BACKOFF_DURATION: Duration = Duration::from_secs(5); +#[cfg(test)] +const BACKOFF_DURATION: Duration = Duration::from_millis(500); + /// Duration after which we consider low connectivity a problem. /// /// Especially at startup low connectivity is expected (authority discovery cache needs to be diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index 2803fd08749f..da8e9ea9b65d 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -124,7 +124,7 @@ impl MockAuthorityDiscovery { } fn add_more_authorties( - &mut self, + &self, new_known: Vec, ) -> HashMap> { let authorities: HashMap<_, _> = @@ -654,7 +654,7 @@ fn issues_update_authorities_after_session() { )); overseer .send(FromOrchestra::Communication { msg }) - .timeout(Duration::from_secs(4)) + .timeout(TIMEOUT) .await .expect("msg send timeout"); } @@ -741,7 +741,7 @@ fn issues_update_authorities_after_session() { )); overseer .send(FromOrchestra::Communication { msg }) - .timeout(Duration::from_secs(4)) + .timeout(TIMEOUT) .await .expect("msg send timeout"); } From 3cc7fa769e30083349b3f837fabf6ec2ca945802 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 12 Feb 2024 13:03:08 +0200 Subject: [PATCH 7/8] More cleanups Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/gossip-support/src/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index da8e9ea9b65d..e15e079e413d 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -72,7 +72,6 @@ lazy_static! { .chain(AUTHORITIES.clone()) .collect() }; - // static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); // [2 6] // [4 5] From ce8d19d426d8219a51fd2b44ca4844f19876f435 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 12 Feb 2024 13:47:53 +0200 Subject: [PATCH 8/8] Remove timeout Signed-off-by: Alexandru Gheorghe --- polkadot/node/network/gossip-support/src/tests.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index e15e079e413d..6817c85f98d8 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -651,11 +651,7 @@ fn issues_update_authorities_after_session() { ValidationVersion::V3.into(), None, )); - overseer - .send(FromOrchestra::Communication { msg }) - .timeout(TIMEOUT) - .await - .expect("msg send timeout"); + overseer.send(FromOrchestra::Communication { msg }).await } Delay::new(BACKOFF_DURATION).await; @@ -738,11 +734,7 @@ fn issues_update_authorities_after_session() { ValidationVersion::V3.into(), None, )); - overseer - .send(FromOrchestra::Communication { msg }) - .timeout(TIMEOUT) - .await - .expect("msg send timeout"); + overseer.send(FromOrchestra::Communication { msg }).await } // 5. Send a new leaf and check UpdateAuthority is emitted only for the newly connected