diff --git a/Cargo.lock b/Cargo.lock index 0d310fe5b89d..4f697218466d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7280,6 +7280,7 @@ dependencies = [ "prioritized-metered-channel", "sc-client-api", "sp-api", + "sp-consensus", "sp-core", "tikv-jemalloc-ctl", "tracing-gum", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 262eddeec61e..584b74ccc26a 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -19,6 +19,7 @@ orchestra = "0.0.4" gum = { package = "tracing-gum", path = "../gum" } lru = "0.9" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } async-trait = "0.1.57" tikv-jemalloc-ctl = "0.5.0" diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 31c38eea07e8..4fb64e43246e 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -30,7 +30,7 @@ use polkadot_overseer::{ self as overseer, dummy::dummy_overseer_builder, gen::{FromOrchestra, SpawnedSubsystem}, - HeadSupportsParachains, SubsystemError, + HeadSupportsParachains, MajorSyncOracle, SubsystemError, }; use polkadot_primitives::{CandidateReceipt, Hash}; @@ -153,13 +153,19 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); - let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None) - .unwrap() - .replace_candidate_validation(|_| Subsystem2) - .replace_candidate_backing(|orig| orig) - .replace_candidate_backing(|_orig| Subsystem1) - .build() - .unwrap(); + let (overseer, _handle) = dummy_overseer_builder( + spawner, + AlwaysSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_validation(|_| Subsystem2) + .replace_candidate_backing(|orig| orig) + .replace_candidate_backing(|_orig| Subsystem1) + .build() + .unwrap(); let overseer_fut = overseer.run().fuse(); let timer_stream = timer_stream; diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index cf72774826ea..217ccf360270 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -15,13 +15,13 @@ // along with Polkadot. If not, see . use crate::{ - prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MetricsTrait, - Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue, + prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MajorSyncOracle, + MetricsTrait, Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue, KNOWN_LEAVES_CACHE_SIZE, }; use lru::LruCache; use orchestra::{FromOrchestra, SpawnedSubsystem, Subsystem, SubsystemContext}; -use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*}; +use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*, BlockNumber, Hash}; // Generated dummy messages use crate::messages::*; @@ -63,6 +63,8 @@ pub fn dummy_overseer_builder( spawner: Spawner, supports_parachains: SupportsParachains, registry: Option<&Registry>, + sync_oracle: MajorSyncOracle, + initial_leaves: Vec<(Hash, BlockNumber)>, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -96,7 +98,14 @@ where SpawnGlue: orchestra::Spawner + 'static, SupportsParachains: HeadSupportsParachains, { - one_for_all_overseer_builder(spawner, supports_parachains, DummySubsystem, registry) + one_for_all_overseer_builder( + spawner, + supports_parachains, + DummySubsystem, + registry, + sync_oracle, + initial_leaves, + ) } /// Create an overseer with all subsystem being `Sub`. @@ -105,6 +114,8 @@ pub fn one_for_all_overseer_builder( supports_parachains: SupportsParachains, subsystem: Sub, registry: Option<&Registry>, + sync_oracle: MajorSyncOracle, + initial_leaves: Vec<(Hash, BlockNumber)>, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -190,9 +201,10 @@ where .span_per_active_leaf(Default::default()) .active_leaves(Default::default()) .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) - .leaves(Default::default()) + .leaves(initial_leaves) .spawner(SpawnGlue(spawner)) .metrics(metrics) + .sync_oracle(sync_oracle) .supports_parachains(supports_parachains); Ok(builder) } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 6b63235e12a1..dd441753a53e 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -83,6 +83,7 @@ use polkadot_node_subsystem_types::messages::{ DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, }; +use sp_consensus::SyncOracle; pub use polkadot_node_subsystem_types::{ errors::{SubsystemError, SubsystemResult}, @@ -330,6 +331,45 @@ pub async fn forward_events>(client: Arc

, mut hand } } +/// Represents the internal state of `MajorSyncOracle` +enum OracleState { + Syncing(Box), + Done, +} +/// Used to detect if the node is in initial major sync. +/// It's worth mentioning that this is a one way check. Once the initial full sync is complete +/// `MajorSyncOracle` will never return false. The reason is that the struct is meant to be used +/// only during initialization. +pub struct MajorSyncOracle { + state: OracleState, +} + +impl MajorSyncOracle { + /// Create `MajorSyncOracle` from `SyncOracle` + pub fn new(sync_oracle: Box) -> Self { + Self { state: OracleState::Syncing(sync_oracle) } + } + + /// Create dummy `MajorSyncOracle` which always returns true for `finished_syncing` + pub fn new_dummy() -> Self { + Self { state: OracleState::Done } + } + + /// Check if node is in major sync + pub fn is_syncing(&mut self) -> bool { + match &mut self.state { + OracleState::Syncing(sync_oracle) => + if sync_oracle.is_major_syncing() { + true + } else { + self.state = OracleState::Done; + false + }, + OracleState::Done => false, + } + } +} + /// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s. /// /// This returns the overseer along with an [`OverseerHandle`] which can @@ -629,6 +669,9 @@ pub struct Overseer { /// Various Prometheus metrics. pub metrics: OverseerMetrics, + + /// SyncOracle is used to detect when initial full node sync is complete + pub sync_oracle: MajorSyncOracle, } /// Spawn the metrics metronome task. @@ -725,14 +768,10 @@ where let metrics = self.metrics.clone(); spawn_metronome_metrics(&mut self, metrics)?; - // Notify about active leaves on startup before starting the loop + // Import the active leaves found in the database but don't broadcast ActiveLeaves for them for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); - if let Some((span, status)) = self.on_head_activated(&hash, None).await { - let update = - ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } + self.on_head_activated(&hash, None).await; } loop { @@ -746,12 +785,28 @@ where Event::Stop => { self.stop().await; return Ok(()); - } + }, + Event::BlockImported(block) if self.sync_oracle.is_syncing() => { + // If we are syncing - don't broadcast ActiveLeaves + self.block_imported(block).await; + }, Event::BlockImported(block) => { - self.block_imported(block).await?; - } + // Syncing is complete - broadcast ActiveLeaves + let update = self.block_imported(block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + }, + Event::BlockFinalized(block) if self.sync_oracle.is_syncing() => { + // Still syncing - just import the block and don't broadcast anything + self.block_finalized(&block).await; + }, Event::BlockFinalized(block) => { - self.block_finalized(block).await?; + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; } Event::ExternalRequest(request) => { self.handle_external_request(request); @@ -781,12 +836,12 @@ where } } - async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate { match self.active_leaves.entry(block.hash) { hash_map::Entry::Vacant(entry) => entry.insert(block.number), hash_map::Entry::Occupied(entry) => { debug_assert_eq!(*entry.get(), block.number); - return Ok(()) + return ActiveLeavesUpdate::default() }, }; @@ -808,13 +863,10 @@ where self.clean_up_external_listeners(); - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - Ok(()) + update } - async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { + async fn block_finalized(&mut self, block: &BlockInfo) -> ActiveLeavesUpdate { let mut update = ActiveLeavesUpdate::default(); self.active_leaves.retain(|h, n| { @@ -832,17 +884,7 @@ where self.on_head_deactivated(deactivated) } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)) - .await?; - - // If there are no leaves being deactivated, we don't need to send an update. - // - // Our peers will be informed about our finalized block the next time we activating/deactivating some leaf. - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - Ok(()) + update } /// Handles a header activation. If the header's state doesn't support the parachains API, @@ -861,7 +903,7 @@ where gum::trace!( target: LOG_TARGET, relay_parent = ?hash, - "Leaf got activated, notifying exterinal listeners" + "Leaf got activated, notifying external listeners" ); for listener in listeners { // it's fine if the listener is no longer interested diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index b9f813ff1dce..3f5edc14ac13 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -173,12 +173,18 @@ fn overseer_works() { let mut s1_rx = s1_rx.fuse(); let mut s2_rx = s2_rx.fuse(); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) - .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) + .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); @@ -233,12 +239,17 @@ fn overseer_metrics_work() { BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 }; let registry = prometheus::Registry::new(); - let (overseer, handle) = - dummy_overseer_builder(spawner, MockSupportsParachains, Some(®istry)) - .unwrap() - .leaves(block_info_to_pair(vec![first_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + Some(®istry), + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -296,11 +307,17 @@ fn overseer_ends_on_subsystem_exit() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (overseer, _handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_backing(|_| ReturnOnStart) - .build() - .unwrap(); + let (overseer, _handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_backing(|_| ReturnOnStart) + .build() + .unwrap(); overseer.run_inner().await.unwrap(); }) @@ -370,6 +387,20 @@ where } } +struct PuppetOracle { + pub state: std::sync::Arc>, +} + +impl SyncOracle for PuppetOracle { + fn is_major_syncing(&self) -> bool { + *self.state.lock().unwrap() + } + + fn is_offline(&self) -> bool { + false + } +} + // Tests that starting with a defined set of leaves and receiving // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. #[test] @@ -391,13 +422,19 @@ fn overseer_start_stop_works() { let (tx_5, mut rx_5) = metered::channel(64); let (tx_6, mut rx_6) = metered::channel(64); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -410,12 +447,6 @@ fn overseer_start_stop_works() { handle.block_imported(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: Some(ActivatedLeaf { hash: second_block_hash, @@ -490,13 +521,19 @@ fn overseer_finalize_works() { // start with two forks of different height. - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block, second_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block, second_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -509,18 +546,6 @@ fn overseer_finalize_works() { handle.block_finalized(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 2, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [first_block_hash, second_block_hash].as_ref().into(), ..Default::default() @@ -586,13 +611,19 @@ fn overseer_finalize_leaf_preserves_it() { // start with two forks at height 1. - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block.clone(), second_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block.clone(), second_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -606,18 +637,6 @@ fn overseer_finalize_leaf_preserves_it() { handle.block_finalized(first_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [second_block_hash].as_ref().into(), ..Default::default() @@ -676,11 +695,17 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { let (tx_5, mut rx_5) = metered::channel(64); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); let mut handle = Handle::new(handle); @@ -728,6 +753,197 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { }); } +#[test] +fn do_not_send_events_before_initial_major_sync_is_complete() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + // Two events which will be fired BEFORE initial full sync for this block + let imported_block_before_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; + + // And two events which will be fired AFTER initial full sync for this block + let imported_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; + + let (tx_5, mut rx_5) = metered::channel(64); + + // Prepare overseer future + let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + Vec::new(), + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); + + // Get overseer future + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + + // Generate 'block imported' and 'block finalized' before initial sync is complete - these must be ignored + handle.block_imported(imported_block_before_sync.clone()).await; + handle.block_finalized(imported_block_before_sync.clone()).await; + + // Run overseer for 500ms to check if any events are generated + loop { + select! { + _ = overseer_fut => { + assert!(false, "Overseer should not exit"); + }, + res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => { + assert_matches!(res, None); + break; + } + } + } + + // Switch the oracle state - initial sync is now completed + *is_syncing.lock().unwrap() = false; + + // Generate two more events - these must NOT be ignored + handle.block_finalized(imported_block_after_sync.clone()).await; + handle.block_imported(imported_block_after_sync.clone()).await; + + let expected_heartbeats = vec![ + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: imported_block_after_sync.hash, + number: imported_block_after_sync.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + })), + // The first leaf should be 'cleaned up' by the overseer + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work( + imported_block_before_sync.hash, + )), + OverseerSignal::BlockFinalized( + imported_block_after_sync.hash, + imported_block_after_sync.number, + ), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handle.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); +} + +#[test] +fn initial_leaves_are_sent_if_nothing_to_sync() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let db_leaves = BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 0 }; + + let (tx_5, mut rx_5) = metered::channel(64); + + // Prepare overseer future + let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(false)); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + vec![(db_leaves.hash, db_leaves.number)], + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); + + // Get overseer future + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + pin_mut!(overseer_fut); + + // Run overseer for 500ms to check if any events are generated + loop { + select! { + _ = overseer_fut => { + assert!(false, "Overseer should not exit"); + }, + res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => { + assert_matches!(res, None); + break; + } + } + } + + // Now complete sync and generate an event just to be sure everything works + let imported_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; + *is_syncing.lock().unwrap() = false; + + // Finalize one of the leaves from db and import one new + handle.block_finalized(db_leaves.clone()).await; + handle.block_imported(imported_block_after_sync.clone()).await; + + let mut ss5_results = Vec::new(); + + let expected_heartbeats = vec![ + OverseerSignal::BlockFinalized(db_leaves.hash, db_leaves.number), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: imported_block_after_sync.hash, + number: imported_block_after_sync.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + })), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handle.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); +} + #[derive(Clone)] struct CounterSubsystem { stop_signals_received: Arc, @@ -940,11 +1156,17 @@ fn overseer_all_subsystems_receive_signals_and_messages() { msgs_received.clone(), ); - let (overseer, handle) = - one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None) - .unwrap() - .build() - .unwrap(); + let (overseer, handle) = one_for_all_overseer_builder( + spawner, + MockSupportsParachains, + subsystem, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 344f31390fc1..fdb856346cf7 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -49,7 +49,7 @@ use { polkadot_node_network_protocol::{ peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, }, - polkadot_overseer::BlockInfo, + polkadot_overseer::{BlockInfo, MajorSyncOracle}, sc_client_api::BlockBackend, sp_core::traits::SpawnNamed, sp_trie::PrefixedMemoryDB, @@ -1091,6 +1091,7 @@ where overseer_message_channel_capacity_override, req_protocol_names, peerset_protocol_names, + sync_oracle: MajorSyncOracle::new(Box::new(network.clone())), }, ) .map_err(|e| { diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 7dff86693827..10a68fa0caea 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -34,8 +34,8 @@ pub use polkadot_overseer::{ HeadSupportsParachains, }; use polkadot_overseer::{ - metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait, - Overseer, OverseerConnector, OverseerHandle, SpawnGlue, + metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MajorSyncOracle, + MetricsTrait, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, }; use polkadot_primitives::runtime_api::ParachainHost; @@ -125,6 +125,8 @@ where pub req_protocol_names: ReqProtocolNames, /// [`PeerSet`] protocol names to protocols mapping. pub peerset_protocol_names: PeerSetProtocolNames, + /// SyncOracle is used to detect when initial full node sync is complete + pub sync_oracle: MajorSyncOracle, } /// Obtain a prepared `OverseerBuilder`, that is initialized @@ -155,6 +157,7 @@ pub fn prepared_overseer_builder( overseer_message_channel_capacity_override, req_protocol_names, peerset_protocol_names, + sync_oracle, }: OverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -319,6 +322,7 @@ where .supports_parachains(runtime_client) .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) .metrics(metrics) + .sync_oracle(sync_oracle) .spawner(spawner); if let Some(capacity) = overseer_message_channel_capacity_override { diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 871f1c5b6d63..4a30ecd7ad0e 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -435,7 +435,7 @@ impl Future for Yield { mod tests { use super::*; use futures::executor::block_on; - use polkadot_node_subsystem::messages::CollatorProtocolMessage; + use polkadot_node_subsystem::{messages::CollatorProtocolMessage, MajorSyncOracle}; use polkadot_overseer::{dummy::dummy_overseer_builder, Handle, HeadSupportsParachains}; use polkadot_primitives::Hash; use sp_core::traits::SpawnNamed; @@ -453,13 +453,18 @@ mod tests { fn forward_subsystem_works() { let spawner = sp_core::testing::TaskExecutor::new(); let (tx, rx) = mpsc::channel(2); - let (overseer, handle) = - dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None) - .unwrap() - .replace_collator_protocol(|_| ForwardSubsystem(tx)) - .leaves(vec![]) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner.clone(), + AlwaysSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_collator_protocol(|_| ForwardSubsystem(tx)) + .leaves(vec![]) + .build() + .unwrap(); let mut handle = Handle::new(handle);