From ae29bb4d7d0d053361a4413e73746cbdca8c81e8 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 9 Feb 2023 16:55:26 +0200 Subject: [PATCH 01/19] Add sync oracle to overseer --- Cargo.lock | 1 + node/overseer/Cargo.toml | 1 + node/overseer/src/dummy.rs | 5 +++-- node/overseer/src/lib.rs | 36 ++++++++++++++++++++++++++++++++++++ node/service/src/lib.rs | 3 ++- node/service/src/overseer.rs | 8 ++++++-- 6 files changed, 49 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d310fe5b89d..4f697218466d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7280,6 +7280,7 @@ dependencies = [ "prioritized-metered-channel", "sc-client-api", "sp-api", + "sp-consensus", "sp-core", "tikv-jemalloc-ctl", "tracing-gum", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 262eddeec61e..584b74ccc26a 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -19,6 +19,7 @@ orchestra = "0.0.4" gum = { package = "tracing-gum", path = "../gum" } lru = "0.9" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } async-trait = "0.1.57" tikv-jemalloc-ctl = "0.5.0" diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index cf72774826ea..b19787d9bec6 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -15,8 +15,8 @@ // along with Polkadot. If not, see . use crate::{ - prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MetricsTrait, - Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue, + prometheus::Registry, HeadSupportsParachains, InitializedOverseerBuilder, MajorSyncOracle, + MetricsTrait, Overseer, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnGlue, KNOWN_LEAVES_CACHE_SIZE, }; use lru::LruCache; @@ -193,6 +193,7 @@ where .leaves(Default::default()) .spawner(SpawnGlue(spawner)) .metrics(metrics) + .sync_oracle(MajorSyncOracle::new_dummy()) .supports_parachains(supports_parachains); Ok(builder) } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 6b63235e12a1..97ea673713e8 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -83,6 +83,7 @@ use polkadot_node_subsystem_types::messages::{ DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, }; +use sp_consensus::SyncOracle; pub use polkadot_node_subsystem_types::{ errors::{SubsystemError, SubsystemResult}, @@ -330,6 +331,38 @@ pub async fn forward_events>(client: Arc

, mut hand } } +/// Used to detect if the node is in major sync. This can happen only on startup so once syncing is +/// done the node is considered up to date and `finished_syncing` will always return `true`. +pub struct MajorSyncOracle { + sync_oracle: Option>, +} + +impl MajorSyncOracle { + /// Create `MajorSyncOracle` from `SyncOracle` + pub fn new(sync_oracle: Box) -> Self { + Self { sync_oracle: Some(sync_oracle) } + } + + /// Create dummy `MajorSyncOracle` which always returns true for `finished_syncing` + pub fn new_dummy() -> Self { + Self { sync_oracle: None } + } + + /// Check if node is in major sync + pub fn finished_syncing(&mut self) -> bool { + match &mut self.sync_oracle { + Some(sync_oracle) => + if !sync_oracle.is_major_syncing() { + self.sync_oracle = None; + true + } else { + false + }, + None => true, + } + } +} + /// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s. /// /// This returns the overseer along with an [`OverseerHandle`] which can @@ -629,6 +662,9 @@ pub struct Overseer { /// Various Prometheus metrics. pub metrics: OverseerMetrics, + + /// SyncOracle is used to detect when initial full node sync is complete + pub sync_oracle: MajorSyncOracle, } /// Spawn the metrics metronome task. diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 344f31390fc1..fdb856346cf7 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -49,7 +49,7 @@ use { polkadot_node_network_protocol::{ peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, }, - polkadot_overseer::BlockInfo, + polkadot_overseer::{BlockInfo, MajorSyncOracle}, sc_client_api::BlockBackend, sp_core::traits::SpawnNamed, sp_trie::PrefixedMemoryDB, @@ -1091,6 +1091,7 @@ where overseer_message_channel_capacity_override, req_protocol_names, peerset_protocol_names, + sync_oracle: MajorSyncOracle::new(Box::new(network.clone())), }, ) .map_err(|e| { diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 7dff86693827..10a68fa0caea 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -34,8 +34,8 @@ pub use polkadot_overseer::{ HeadSupportsParachains, }; use polkadot_overseer::{ - metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait, - Overseer, OverseerConnector, OverseerHandle, SpawnGlue, + metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MajorSyncOracle, + MetricsTrait, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, }; use polkadot_primitives::runtime_api::ParachainHost; @@ -125,6 +125,8 @@ where pub req_protocol_names: ReqProtocolNames, /// [`PeerSet`] protocol names to protocols mapping. pub peerset_protocol_names: PeerSetProtocolNames, + /// SyncOracle is used to detect when initial full node sync is complete + pub sync_oracle: MajorSyncOracle, } /// Obtain a prepared `OverseerBuilder`, that is initialized @@ -155,6 +157,7 @@ pub fn prepared_overseer_builder( overseer_message_channel_capacity_override, req_protocol_names, peerset_protocol_names, + sync_oracle, }: OverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -319,6 +322,7 @@ where .supports_parachains(runtime_client) .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) .metrics(metrics) + .sync_oracle(sync_oracle) .spawner(spawner); if let Some(capacity) = overseer_message_channel_capacity_override { From 70d031dd8897c05a243e4ad9fae771547544eb46 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 10 Feb 2023 13:40:06 +0200 Subject: [PATCH 02/19] Don't send `ActiveLeavesUpdate` and `BlockFinalized` until full sync Overseer won't generate any events to the subsystems until initial full sync is complete. --- node/overseer/src/lib.rs | 170 ++++++++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 57 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 97ea673713e8..03c2717fabd2 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -761,68 +761,137 @@ where let metrics = self.metrics.clone(); spawn_metronome_metrics(&mut self, metrics)?; + let initial_sync_finished = self.sync_oracle.finished_syncing(); // Notify about active leaves on startup before starting the loop for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); if let Some((span, status)) = self.on_head_activated(&hash, None).await { - let update = - ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + if initial_sync_finished { + // Initial sync is complete. Notify the subsystems and proceed to the main loop + let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash, + number, + status, + span, + }); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } } } + // wait for initial sync + if !initial_sync_finished { + loop { + select! { + msg = self.events_rx.select_next_some() => { + match msg { + Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, + Event::Stop => return self.handle_stop().await, + Event::BlockImported(block) => _ = self.block_imported(block).await, + Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { + self.block_finalized(&block).await; + // send initial active leaves update + for (hash, number) in self.active_leaves.clone().into_iter() { + let span = match self.span_per_active_leaf.get(&hash) { + Some(span) => span.clone(), + None => { + // thus should never happen + gum::error!(target: LOG_TARGET, ?hash, ?number, "Span for active leaf not found. This is not expected"); + let span = Arc::new(jaeger::Span::new(hash.clone(), "leaf-activated")); + self.span_per_active_leaf.insert(hash, span.clone()); + span + } + }; + + let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash, + number, + status: LeafStatus::Fresh, + span, + }); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + break + }, + Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, + Event::ExternalRequest(request) => self.handle_external_request(request) + } + }, + msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), + res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await + } + } + } + + // main loop loop { select! { msg = self.events_rx.select_next_some() => { match msg { - Event::MsgToSubsystem { msg, origin } => { - self.route_message(msg.into(), origin).await?; - self.metrics.on_message_relayed(); - } - Event::Stop => { - self.stop().await; - return Ok(()); - } + Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, + Event::Stop => return self.handle_stop().await, Event::BlockImported(block) => { - self.block_imported(block).await?; - } - Event::BlockFinalized(block) => { - self.block_finalized(block).await?; - } - Event::ExternalRequest(request) => { - self.handle_external_request(request); - } - } - }, - msg = self.to_orchestra_rx.select_next_some() => { - match msg { - ToOrchestra::SpawnJob { name, subsystem, s } => { - self.spawn_job(name, subsystem, s); - } - ToOrchestra::SpawnBlockingJob { name, subsystem, s } => { - self.spawn_blocking_job(name, subsystem, s); - } + let update = self.block_imported(block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + }, + Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, + Event::ExternalRequest(request) => self.handle_external_request(request) } }, - res = self.running_subsystems.select_next_some() => { - gum::error!( - target: LOG_TARGET, - subsystem = ?res, - "subsystem finished unexpectedly", - ); - self.stop().await; - return res; - }, + msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), + res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await } } } - async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + async fn handle_stop(self) -> Result<(), SubsystemError> { + self.stop().await; + return Ok(()) + } + + fn handle_orchestra_rx(&mut self, msg: ToOrchestra) { + match msg { + ToOrchestra::SpawnJob { name, subsystem, s } => { + self.spawn_job(name, subsystem, s); + }, + ToOrchestra::SpawnBlockingJob { name, subsystem, s } => { + self.spawn_blocking_job(name, subsystem, s); + }, + } + } + + async fn handle_msg_to_subsystem( + &mut self, + msg: AllMessages, + origin: &'static str, + ) -> Result<(), SubsystemError> { + self.route_message(msg.into(), origin).await?; + self.metrics.on_message_relayed(); + Ok(()) + } + + async fn handle_running_subsystems( + self, + res: Result<(), SubsystemError>, + ) -> Result<(), SubsystemError> { + gum::error!( + target: LOG_TARGET, + subsystem = ?res, + "subsystem finished unexpectedly", + ); + self.stop().await; + return res + } + + async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate { match self.active_leaves.entry(block.hash) { hash_map::Entry::Vacant(entry) => entry.insert(block.number), hash_map::Entry::Occupied(entry) => { debug_assert_eq!(*entry.get(), block.number); - return Ok(()) + return ActiveLeavesUpdate::default() }, }; @@ -844,13 +913,10 @@ where self.clean_up_external_listeners(); - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - Ok(()) + update } - async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { + async fn block_finalized(&mut self, block: &BlockInfo) -> ActiveLeavesUpdate { let mut update = ActiveLeavesUpdate::default(); self.active_leaves.retain(|h, n| { @@ -868,17 +934,7 @@ where self.on_head_deactivated(deactivated) } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)) - .await?; - - // If there are no leaves being deactivated, we don't need to send an update. - // - // Our peers will be informed about our finalized block the next time we activating/deactivating some leaf. - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - Ok(()) + update } /// Handles a header activation. If the header's state doesn't support the parachains API, @@ -897,7 +953,7 @@ where gum::trace!( target: LOG_TARGET, relay_parent = ?hash, - "Leaf got activated, notifying exterinal listeners" + "Leaf got activated, notifying external listeners" ); for listener in listeners { // it's fine if the listener is no longer interested From 349c89f86402615584f7ef5379325b42c90c6451 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 10 Feb 2023 13:54:01 +0200 Subject: [PATCH 03/19] Comments and indentation --- node/overseer/src/lib.rs | 114 +++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 03c2717fabd2..02b294b3cd0d 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -331,8 +331,10 @@ pub async fn forward_events>(client: Arc

, mut hand } } -/// Used to detect if the node is in major sync. This can happen only on startup so once syncing is -/// done the node is considered up to date and `finished_syncing` will always return `true`. +/// Used to detect if the node is in initial major sync. +/// It's worth mentioning that this is a one way check. Once the initial full sync is complete +/// `MajorSyncOracle` will never return false. The reason is that the struct is meant to be used +/// only during initialization. pub struct MajorSyncOracle { sync_oracle: Option>, } @@ -762,7 +764,7 @@ where spawn_metronome_metrics(&mut self, metrics)?; let initial_sync_finished = self.sync_oracle.finished_syncing(); - // Notify about active leaves on startup before starting the loop + // Import the active leaves found in the database for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); if let Some((span, status)) = self.on_head_activated(&hash, None).await { @@ -779,48 +781,70 @@ where } } - // wait for initial sync - if !initial_sync_finished { - loop { - select! { - msg = self.events_rx.select_next_some() => { - match msg { - Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, - Event::Stop => return self.handle_stop().await, - Event::BlockImported(block) => _ = self.block_imported(block).await, - Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { - self.block_finalized(&block).await; - // send initial active leaves update - for (hash, number) in self.active_leaves.clone().into_iter() { - let span = match self.span_per_active_leaf.get(&hash) { - Some(span) => span.clone(), - None => { - // thus should never happen - gum::error!(target: LOG_TARGET, ?hash, ?number, "Span for active leaf not found. This is not expected"); - let span = Arc::new(jaeger::Span::new(hash.clone(), "leaf-activated")); - self.span_per_active_leaf.insert(hash, span.clone()); - span - } - }; - - let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash, - number, - status: LeafStatus::Fresh, - span, - }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - break - }, - Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, - Event::ExternalRequest(request) => self.handle_external_request(request) - } - }, - msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), - res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await - } + // If initial sync is not complete - wait for it. + // This loop is identical to the main one but doesn't generate `ActiveLeaves` and `BlockFinalized` events until + // the initial full sync is complete. + // This is an infinite loop which executes only when `!initial_sync_finished`. The weird syntax is only to save + // one extra layer if indentation because it's already a bit tough for the eyes. + // Think about it like: + // ``` + // if !initial_sync_finished { + // loop { + // select! { + // ... + // } + // } + // } + //``` + while !initial_sync_finished { + select! { + msg = self.events_rx.select_next_some() => { + match msg { + Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, + Event::Stop => return self.handle_stop().await, + Event::BlockImported(block) => _ = self.block_imported(block).await, + Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { + // Initial sync is complete + self.block_finalized(&block).await; + // Send initial `ActiveLeaves` + for (hash, number) in self.active_leaves.clone().into_iter() { + let span = match self.span_per_active_leaf.get(&hash) { + Some(span) => span.clone(), + None => { + // This should never happen. Spans are generated in `on_head_activated` + // which is called from `block_imported`. Despite not sending a signal + // `BlockImported` events are handled so a span should exist for each + // active leaf. + gum::error!( + target: LOG_TARGET, + ?hash, + ?number, + "Span for active leaf not found. This is not expected" + ); + let span = Arc::new(jaeger::Span::new(hash.clone(), "leaf-activated")); + self.span_per_active_leaf.insert(hash, span.clone()); + span + } + }; + + let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash, + number, + status: LeafStatus::Fresh, + span, + }); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + // Send initial `BlockFinalized` + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + break + }, + Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, + Event::ExternalRequest(request) => self.handle_external_request(request) + } + }, + msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), + res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await } } From 5bae5707d13772348917e2acec628e6c79696f79 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 10 Feb 2023 14:21:05 +0200 Subject: [PATCH 04/19] Remove unnecessary `clone()` --- node/overseer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 02b294b3cd0d..fba95b7589e9 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -821,7 +821,7 @@ where ?number, "Span for active leaf not found. This is not expected" ); - let span = Arc::new(jaeger::Span::new(hash.clone(), "leaf-activated")); + let span = Arc::new(jaeger::Span::new(hash, "leaf-activated")); self.span_per_active_leaf.insert(hash, span.clone()); span } From fe09d1278f78114a317f2bf3a09ba829650030e0 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 10 Feb 2023 17:57:42 +0200 Subject: [PATCH 05/19] Fix `BlockFinalized` handling in main loop --- node/overseer/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index fba95b7589e9..a7da79ec2389 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -861,7 +861,13 @@ where self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } }, - Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, + Event::BlockFinalized(block) => { + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + }, Event::ExternalRequest(request) => self.handle_external_request(request) } }, From 4c4a4dbd0d3a3673af8c4cd51bcb7484ad70333e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 13 Feb 2023 11:12:15 +0200 Subject: [PATCH 06/19] Pass `SyncOracle` as a parameter to `dummy_overseer_builder` --- node/overseer/examples/minimal-example.rs | 21 ++-- node/overseer/src/dummy.rs | 12 +- node/overseer/src/tests.rs | 135 ++++++++++++++-------- 3 files changed, 110 insertions(+), 58 deletions(-) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 31c38eea07e8..aa7fe206b226 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -30,7 +30,7 @@ use polkadot_overseer::{ self as overseer, dummy::dummy_overseer_builder, gen::{FromOrchestra, SpawnedSubsystem}, - HeadSupportsParachains, SubsystemError, + HeadSupportsParachains, MajorSyncOracle, SubsystemError, }; use polkadot_primitives::{CandidateReceipt, Hash}; @@ -153,13 +153,18 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); - let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None) - .unwrap() - .replace_candidate_validation(|_| Subsystem2) - .replace_candidate_backing(|orig| orig) - .replace_candidate_backing(|_orig| Subsystem1) - .build() - .unwrap(); + let (overseer, _handle) = dummy_overseer_builder( + spawner, + AlwaysSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_validation(|_| Subsystem2) + .replace_candidate_backing(|orig| orig) + .replace_candidate_backing(|_orig| Subsystem1) + .build() + .unwrap(); let overseer_fut = overseer.run().fuse(); let timer_stream = timer_stream; diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index b19787d9bec6..31a9467e466a 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -63,6 +63,7 @@ pub fn dummy_overseer_builder( spawner: Spawner, supports_parachains: SupportsParachains, registry: Option<&Registry>, + sync_oracle: MajorSyncOracle, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -96,7 +97,13 @@ where SpawnGlue: orchestra::Spawner + 'static, SupportsParachains: HeadSupportsParachains, { - one_for_all_overseer_builder(spawner, supports_parachains, DummySubsystem, registry) + one_for_all_overseer_builder( + spawner, + supports_parachains, + DummySubsystem, + registry, + sync_oracle, + ) } /// Create an overseer with all subsystem being `Sub`. @@ -105,6 +112,7 @@ pub fn one_for_all_overseer_builder( supports_parachains: SupportsParachains, subsystem: Sub, registry: Option<&Registry>, + sync_oracle: MajorSyncOracle, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -193,7 +201,7 @@ where .leaves(Default::default()) .spawner(SpawnGlue(spawner)) .metrics(metrics) - .sync_oracle(MajorSyncOracle::new_dummy()) + .sync_oracle(sync_oracle) .supports_parachains(supports_parachains); Ok(builder) } diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index b9f813ff1dce..056417641419 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -173,12 +173,17 @@ fn overseer_works() { let mut s1_rx = s1_rx.fuse(); let mut s2_rx = s2_rx.fuse(); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) - .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) + .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); @@ -233,12 +238,16 @@ fn overseer_metrics_work() { BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 }; let registry = prometheus::Registry::new(); - let (overseer, handle) = - dummy_overseer_builder(spawner, MockSupportsParachains, Some(®istry)) - .unwrap() - .leaves(block_info_to_pair(vec![first_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + Some(®istry), + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -296,11 +305,16 @@ fn overseer_ends_on_subsystem_exit() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (overseer, _handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_backing(|_| ReturnOnStart) - .build() - .unwrap(); + let (overseer, _handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_backing(|_| ReturnOnStart) + .build() + .unwrap(); overseer.run_inner().await.unwrap(); }) @@ -391,13 +405,18 @@ fn overseer_start_stop_works() { let (tx_5, mut rx_5) = metered::channel(64); let (tx_6, mut rx_6) = metered::channel(64); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -490,13 +509,18 @@ fn overseer_finalize_works() { // start with two forks of different height. - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block, second_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block, second_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -586,13 +610,18 @@ fn overseer_finalize_leaf_preserves_it() { // start with two forks at height 1. - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) - .leaves(block_info_to_pair(vec![first_block.clone(), second_block])) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block.clone(), second_block])) + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); @@ -676,11 +705,16 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { let (tx_5, mut rx_5) = metered::channel(64); - let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) - .unwrap() - .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); let mut handle = Handle::new(handle); @@ -940,11 +974,16 @@ fn overseer_all_subsystems_receive_signals_and_messages() { msgs_received.clone(), ); - let (overseer, handle) = - one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None) - .unwrap() - .build() - .unwrap(); + let (overseer, handle) = one_for_all_overseer_builder( + spawner, + MockSupportsParachains, + subsystem, + None, + MajorSyncOracle::new_dummy(), + ) + .unwrap() + .build() + .unwrap(); let mut handle = Handle::new(handle); let overseer_fut = overseer.run_inner().fuse(); From cfcd09855c2f05ebc1a9b2f32cd049ce3b465435 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 13 Feb 2023 21:50:06 +0200 Subject: [PATCH 07/19] Some tests --- node/overseer/src/tests.rs | 186 +++++++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 056417641419..b363b9c50ae6 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -762,6 +762,178 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { }); } +#[test] +fn do_not_send_events_before_initial_major_sync_is_complete() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + // Two events which will be fired BEFORE initial full sync + let imported_block_before_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; + + // And two events which will be fired AFTER initial full sync + let imported_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; + + let (tx_5, mut rx_5) = metered::channel(64); + + // Prepare overseer future + let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); + + // Get overseer future + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + + // Generate 'block imported' and 'block finalized' before initial sync is complete - these must be ignored + handle.block_imported(imported_block_before_sync.clone()).await; + handle.block_finalized(imported_block_before_sync.clone()).await; + + loop { + select! { + _ = overseer_fut => { + assert!(false, "Overseer should not exit"); + }, + res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => { + assert_matches!(res, None); + break; + } + } + } + + // Switch the oracle state - initial sync is completed + *is_syncing.lock().unwrap() = false; + + // Generate two more events - these must not be ignored + handle.block_finalized(imported_block_after_sync.clone()).await; + handle.block_imported(imported_block_after_sync.clone()).await; + + let expected_heartbeats = vec![ + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: imported_block_after_sync.hash, + number: imported_block_after_sync.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + })), + OverseerSignal::BlockFinalized( + imported_block_after_sync.hash, + imported_block_after_sync.number, + ), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handle.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); +} + +#[test] +fn active_leaves_is_sent_on_stalled_finality() { + let spawner = sp_core::testing::TaskExecutor::new(); + + let (tx_5, mut rx_5) = metered::channel(64); + let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); + + // Prepare overseer + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); + + // Prepare overseer future + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + pin_mut!(overseer_fut); + + let test = async move { + // Simulate one block received via sync + let finalized_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; + + let mut ss5_results = Vec::new(); + + // Switch the oracle state - initial sync is completed + *is_syncing.lock().unwrap() = false; + + // Generate 'block imported' event which simulates end of sync + handle.block_finalized(finalized_block_after_sync.clone()).await; + + let expected_heartbeats = vec![ + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: finalized_block_after_sync.hash, + number: finalized_block_after_sync.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + })), + OverseerSignal::BlockFinalized( + finalized_block_after_sync.hash, + finalized_block_after_sync.number, + ), + ]; + + loop { + let res = rx_5.next().timeout(Duration::from_millis(100)).await; + assert_matches!(res, Some(res) => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + }); + + if ss5_results.len() == expected_heartbeats.len() { + handle.stop().await; + break + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }; + + let (res, _) = executor::block_on(futures::future::join(overseer_fut, test)); + assert_matches!(res, Ok(())); +} + #[derive(Clone)] struct CounterSubsystem { stop_signals_received: Arc, @@ -955,6 +1127,20 @@ fn test_chain_selection_msg() -> ChainSelectionMessage { ChainSelectionMessage::Approved(Default::default()) } +struct PuppetOracle { + pub state: std::sync::Arc>, +} + +impl SyncOracle for PuppetOracle { + fn is_major_syncing(&self) -> bool { + *self.state.lock().unwrap() + } + + fn is_offline(&self) -> bool { + false + } +} + // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { From 27b0cf53f1b5de5116c851cce8c90c44cc28d7c6 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 13 Feb 2023 21:50:20 +0200 Subject: [PATCH 08/19] Fix initial sync loop --- node/overseer/src/lib.rs | 72 ++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index a7da79ec2389..89ca82396552 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -804,40 +804,46 @@ where Event::Stop => return self.handle_stop().await, Event::BlockImported(block) => _ = self.block_imported(block).await, Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { + // Send initial `ActiveLeaves` + let update = self.block_imported(block.clone()).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } else { + // In theory we can receive `BlockImported` during initial major sync. In this case the + // update will be empty. + let span = match self.span_per_active_leaf.get(&block.hash) { + Some(span) => span.clone(), + None => { + // This should never happen. + gum::warn!( + target: LOG_TARGET, + ?block.hash, + ?block.number, + "Span for active leaf not found. This is not expected" + ); + let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated")); + span + } + }; + let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: block.hash, + number: block.number, + status: LeafStatus::Fresh, + span, + }); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + + // Initial sync is complete - self.block_finalized(&block).await; - // Send initial `ActiveLeaves` - for (hash, number) in self.active_leaves.clone().into_iter() { - let span = match self.span_per_active_leaf.get(&hash) { - Some(span) => span.clone(), - None => { - // This should never happen. Spans are generated in `on_head_activated` - // which is called from `block_imported`. Despite not sending a signal - // `BlockImported` events are handled so a span should exist for each - // active leaf. - gum::error!( - target: LOG_TARGET, - ?hash, - ?number, - "Span for active leaf not found. This is not expected" - ); - let span = Arc::new(jaeger::Span::new(hash, "leaf-activated")); - self.span_per_active_leaf.insert(hash, span.clone()); - span - } - }; - - let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash, - number, - status: LeafStatus::Fresh, - span, - }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - // Send initial `BlockFinalized` - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - break + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + + // Send initial `BlockFinalized` + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + break }, Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, Event::ExternalRequest(request) => self.handle_external_request(request) From 2204966922fc2ce3ed44e3b110e2f455a0d8eb92 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 13 Feb 2023 22:27:24 +0200 Subject: [PATCH 09/19] More tests --- node/overseer/src/dummy.rs | 7 ++- node/overseer/src/tests.rs | 103 ++++++++++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 16 deletions(-) diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index 31a9467e466a..217ccf360270 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -21,7 +21,7 @@ use crate::{ }; use lru::LruCache; use orchestra::{FromOrchestra, SpawnedSubsystem, Subsystem, SubsystemContext}; -use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*}; +use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*, BlockNumber, Hash}; // Generated dummy messages use crate::messages::*; @@ -64,6 +64,7 @@ pub fn dummy_overseer_builder( supports_parachains: SupportsParachains, registry: Option<&Registry>, sync_oracle: MajorSyncOracle, + initial_leaves: Vec<(Hash, BlockNumber)>, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -103,6 +104,7 @@ where DummySubsystem, registry, sync_oracle, + initial_leaves, ) } @@ -113,6 +115,7 @@ pub fn one_for_all_overseer_builder( subsystem: Sub, registry: Option<&Registry>, sync_oracle: MajorSyncOracle, + initial_leaves: Vec<(Hash, BlockNumber)>, ) -> Result< InitializedOverseerBuilder< SpawnGlue, @@ -198,7 +201,7 @@ where .span_per_active_leaf(Default::default()) .active_leaves(Default::default()) .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) - .leaves(Default::default()) + .leaves(initial_leaves) .spawner(SpawnGlue(spawner)) .metrics(metrics) .sync_oracle(sync_oracle) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index b363b9c50ae6..b7d76dae0f2c 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -178,6 +178,7 @@ fn overseer_works() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) @@ -243,6 +244,7 @@ fn overseer_metrics_work() { MockSupportsParachains, Some(®istry), MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .leaves(block_info_to_pair(vec![first_block])) @@ -310,6 +312,7 @@ fn overseer_ends_on_subsystem_exit() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_backing(|_| ReturnOnStart) @@ -384,6 +387,20 @@ where } } +struct PuppetOracle { + pub state: std::sync::Arc>, +} + +impl SyncOracle for PuppetOracle { + fn is_major_syncing(&self) -> bool { + *self.state.lock().unwrap() + } + + fn is_offline(&self) -> bool { + false + } +} + // Tests that starting with a defined set of leaves and receiving // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. #[test] @@ -410,6 +427,7 @@ fn overseer_start_stop_works() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) @@ -514,6 +532,7 @@ fn overseer_finalize_works() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) @@ -615,6 +634,7 @@ fn overseer_finalize_leaf_preserves_it() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) @@ -710,6 +730,7 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { MockSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) @@ -784,6 +805,7 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { MockSupportsParachains, None, MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + Vec::new(), ) .unwrap() .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) @@ -872,6 +894,7 @@ fn active_leaves_is_sent_on_stalled_finality() { MockSupportsParachains, None, MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + Vec::new(), ) .unwrap() .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) @@ -934,6 +957,71 @@ fn active_leaves_is_sent_on_stalled_finality() { assert_matches!(res, Ok(())); } +#[test] +fn initial_leaves_are_sent_if_nothing_to_sync() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let db_leaves = BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 0 }; + + let (tx_5, mut rx_5) = metered::channel(64); + + // Prepare overseer future + let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(false)); + let (overseer, handle) = dummy_overseer_builder( + spawner, + MockSupportsParachains, + None, + MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), + vec![(db_leaves.hash, db_leaves.number)], + ) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); + + // Get overseer future + let mut handle = Handle::new(handle); + let overseer_fut = overseer.run_inner().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + + // Generate two more events - these must not be ignore + let expected_heartbeats = + vec![OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: db_leaves.hash, + number: db_leaves.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + }))]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = dbg!(res) { + ss5_results.push(res); + } + } + } + + if ss5_results.len() == expected_heartbeats.len() { + handle.stop().await; + } + } + + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + } + }); +} + #[derive(Clone)] struct CounterSubsystem { stop_signals_received: Arc, @@ -1127,20 +1215,6 @@ fn test_chain_selection_msg() -> ChainSelectionMessage { ChainSelectionMessage::Approved(Default::default()) } -struct PuppetOracle { - pub state: std::sync::Arc>, -} - -impl SyncOracle for PuppetOracle { - fn is_major_syncing(&self) -> bool { - *self.state.lock().unwrap() - } - - fn is_offline(&self) -> bool { - false - } -} - // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { @@ -1166,6 +1240,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { subsystem, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .build() From 2ba4a974b84ebe2a00b4dd322c65ecfbf7ca9a2e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 15:17:11 +0200 Subject: [PATCH 10/19] Update node/overseer/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- node/overseer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 89ca82396552..a0a0cf8b9848 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -885,7 +885,7 @@ where async fn handle_stop(self) -> Result<(), SubsystemError> { self.stop().await; - return Ok(()) + Ok(()) } fn handle_orchestra_rx(&mut self, msg: ToOrchestra) { From 969f71a76f4ff63005bb926ed7d4073125866a52 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 13:51:40 +0200 Subject: [PATCH 11/19] Extract initial ActiveLeaves update logic in `prepare_initial_active_leaves` --- node/overseer/src/lib.rs | 65 ++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index a0a0cf8b9848..4c4eae38aa70 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -804,44 +804,33 @@ where Event::Stop => return self.handle_stop().await, Event::BlockImported(block) => _ = self.block_imported(block).await, Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { - // Send initial `ActiveLeaves` + // Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the + // subsystems start doing work when they receive their first `ActiveLeaves` event. We need + // to ensure such event is sent no matter what. + // We can also wait for the next leaf to be activated which is safe in probably 99% of the + // cases. However if the finality is stalled for some reason and a new node is started or + // restarted its subsystems won't start without the logic here. + // + // To force an `ActiveLeaves` event first we do an artificial `block_import`. let update = self.block_imported(block.clone()).await; if !update.is_empty() { + // it might yield an `ActiveLeaves` update. If so - send it. self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } else { - // In theory we can receive `BlockImported` during initial major sync. In this case the - // update will be empty. - let span = match self.span_per_active_leaf.get(&block.hash) { - Some(span) => span.clone(), - None => { - // This should never happen. - gum::warn!( - target: LOG_TARGET, - ?block.hash, - ?block.number, - "Span for active leaf not found. This is not expected" - ); - let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated")); - span - } - }; - let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: block.hash, - number: block.number, - status: LeafStatus::Fresh, - span, - }); + // if not - prepare one manually. All the required state should be already available. + let update = self.prepare_initial_active_leaves(&block).await; self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - // Initial sync is complete + // Now process finalized event. It will probably yield an `ActiveLeaves` which will + // deactivate the heads activated by the previous event in this scope, so broadcast it. let update = self.block_finalized(&block).await; if !update.is_empty() { self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - // Send initial `BlockFinalized` + // And finally broadcast `BlockFinalized` self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; break }, @@ -883,6 +872,32 @@ where } } + async fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate { + // In theory we can receive `BlockImported` during initial major sync. In this case the + // update will be empty. + let span = match self.span_per_active_leaf.get(&block.hash) { + Some(span) => span.clone(), + None => { + // This should never happen. + gum::warn!( + target: LOG_TARGET, + ?block.hash, + ?block.number, + "Span for initial active leaf not found. This is not expected" + ); + let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated")); + span + }, + }; + + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: block.hash, + number: block.number, + status: LeafStatus::Fresh, + span, + }) + } + async fn handle_stop(self) -> Result<(), SubsystemError> { self.stop().await; Ok(()) From 81a64e5c46392a395cbcad476c5757589e3e9fcb Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 14:26:39 +0200 Subject: [PATCH 12/19] Update tests --- node/overseer/examples/minimal-example.rs | 1 + node/overseer/src/tests.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index aa7fe206b226..4fb64e43246e 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -158,6 +158,7 @@ fn main() { AlwaysSupportsParachains, None, MajorSyncOracle::new_dummy(), + Vec::new(), ) .unwrap() .replace_candidate_validation(|_| Subsystem2) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index b7d76dae0f2c..d6a595f00a5a 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -849,6 +849,9 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, })), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work( + imported_block_before_sync.hash, + )), OverseerSignal::BlockFinalized( imported_block_after_sync.hash, imported_block_after_sync.number, From 6bdb390c6ac2729ab41db8512d31c554af6d1fb0 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 15:13:44 +0200 Subject: [PATCH 13/19] Better comments in tests --- node/overseer/src/tests.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index d6a595f00a5a..31664b88928b 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -788,11 +788,11 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - // Two events which will be fired BEFORE initial full sync + // Two events which will be fired BEFORE initial full sync for this block let imported_block_before_sync = BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; - // And two events which will be fired AFTER initial full sync + // And two events which will be fired AFTER initial full sync for this block let imported_block_after_sync = BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; @@ -823,6 +823,7 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { handle.block_imported(imported_block_before_sync.clone()).await; handle.block_finalized(imported_block_before_sync.clone()).await; + // Run overseer for 500ms to check if any events are generated loop { select! { _ = overseer_fut => { @@ -835,10 +836,10 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { } } - // Switch the oracle state - initial sync is completed + // Switch the oracle state - initial sync is now completed *is_syncing.lock().unwrap() = false; - // Generate two more events - these must not be ignored + // Generate two more events - these must NOT be ignored handle.block_finalized(imported_block_after_sync.clone()).await; handle.block_imported(imported_block_after_sync.clone()).await; @@ -849,6 +850,7 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, })), + // The first leaf should be 'cleaned up' by the overseer OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work( imported_block_before_sync.hash, )), @@ -887,7 +889,6 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { #[test] fn active_leaves_is_sent_on_stalled_finality() { let spawner = sp_core::testing::TaskExecutor::new(); - let (tx_5, mut rx_5) = metered::channel(64); let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); @@ -909,8 +910,12 @@ fn active_leaves_is_sent_on_stalled_finality() { let overseer_fut = overseer.run_inner().fuse(); pin_mut!(overseer_fut); + // The test logic. It will run concurrently with the overseer. + // The test simulates a node (re)started during a finality stall. Overseer starts, receives one + // `block_finalized` event. This is equivalent to starting the node and syncing state. However + // after that no `block_imported` events are received. Despite that overseer should generate one + // `ActiveLeaves` event for the last finalized block and broadcast it to the subsystems let test = async move { - // Simulate one block received via sync let finalized_block_after_sync = BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; From f8467d8bee7711f43107de0cb0e3a603bfc684ae Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 20:25:56 +0200 Subject: [PATCH 14/19] Remove unneeded async --- node/overseer/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 4c4eae38aa70..05f8d8baa0c3 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -818,7 +818,7 @@ where self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } else { // if not - prepare one manually. All the required state should be already available. - let update = self.prepare_initial_active_leaves(&block).await; + let update = self.prepare_initial_active_leaves(&block); self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } @@ -872,7 +872,7 @@ where } } - async fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate { + fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate { // In theory we can receive `BlockImported` during initial major sync. In this case the // update will be empty. let span = match self.span_per_active_leaf.get(&block.hash) { From a3840a762a3dddb0214a59b396a2a40b36182d4e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 14 Feb 2023 22:10:56 +0200 Subject: [PATCH 15/19] Fix `dummy_overseer_builder` usage --- node/subsystem-test-helpers/src/lib.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 871f1c5b6d63..4a30ecd7ad0e 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -435,7 +435,7 @@ impl Future for Yield { mod tests { use super::*; use futures::executor::block_on; - use polkadot_node_subsystem::messages::CollatorProtocolMessage; + use polkadot_node_subsystem::{messages::CollatorProtocolMessage, MajorSyncOracle}; use polkadot_overseer::{dummy::dummy_overseer_builder, Handle, HeadSupportsParachains}; use polkadot_primitives::Hash; use sp_core::traits::SpawnNamed; @@ -453,13 +453,18 @@ mod tests { fn forward_subsystem_works() { let spawner = sp_core::testing::TaskExecutor::new(); let (tx, rx) = mpsc::channel(2); - let (overseer, handle) = - dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None) - .unwrap() - .replace_collator_protocol(|_| ForwardSubsystem(tx)) - .leaves(vec![]) - .build() - .unwrap(); + let (overseer, handle) = dummy_overseer_builder( + spawner.clone(), + AlwaysSupportsParachains, + None, + MajorSyncOracle::new_dummy(), + Vec::new(), + ) + .unwrap() + .replace_collator_protocol(|_| ForwardSubsystem(tx)) + .leaves(vec![]) + .build() + .unwrap(); let mut handle = Handle::new(handle); From ade3d9d717aee351547a0091a78d620d762032c8 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 15 Feb 2023 11:05:19 +0200 Subject: [PATCH 16/19] Remove `Option>` from `MajorSyncOracle` and put enum instead --- node/overseer/src/lib.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 05f8d8baa0c3..822d5e95d0e6 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -331,36 +331,41 @@ pub async fn forward_events>(client: Arc

, mut hand } } +///Represents the internal state of `MajorSyncOracle` +enum OracleState { + Syncing(Box), + Done, +} /// Used to detect if the node is in initial major sync. /// It's worth mentioning that this is a one way check. Once the initial full sync is complete /// `MajorSyncOracle` will never return false. The reason is that the struct is meant to be used /// only during initialization. pub struct MajorSyncOracle { - sync_oracle: Option>, + state: OracleState, } impl MajorSyncOracle { /// Create `MajorSyncOracle` from `SyncOracle` pub fn new(sync_oracle: Box) -> Self { - Self { sync_oracle: Some(sync_oracle) } + Self { state: OracleState::Syncing(sync_oracle) } } /// Create dummy `MajorSyncOracle` which always returns true for `finished_syncing` pub fn new_dummy() -> Self { - Self { sync_oracle: None } + Self { state: OracleState::Done } } /// Check if node is in major sync pub fn finished_syncing(&mut self) -> bool { - match &mut self.sync_oracle { - Some(sync_oracle) => + match &mut self.state { + OracleState::Syncing(sync_oracle) => if !sync_oracle.is_major_syncing() { - self.sync_oracle = None; + self.state = OracleState::Done; true } else { false }, - None => true, + OracleState::Done => true, } } } From ba1167bd6b45d71bc95e26e0bac126a696a80707 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 15 Feb 2023 15:17:05 +0200 Subject: [PATCH 17/19] Remove code duplication --- node/overseer/src/lib.rs | 130 +++++++++++++++++-------------------- node/overseer/src/tests.rs | 10 ++- 2 files changed, 69 insertions(+), 71 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 822d5e95d0e6..c4f6256e0e4e 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -765,6 +765,12 @@ where } async fn run_inner(mut self) -> SubsystemResult<()> { + enum MainLoopState { + Syncing, + Running, + } + let mut main_loop_state = MainLoopState::Syncing; + let metrics = self.metrics.clone(); spawn_metronome_metrics(&mut self, metrics)?; @@ -782,72 +788,11 @@ where span, }); self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + main_loop_state = MainLoopState::Running; } } } - // If initial sync is not complete - wait for it. - // This loop is identical to the main one but doesn't generate `ActiveLeaves` and `BlockFinalized` events until - // the initial full sync is complete. - // This is an infinite loop which executes only when `!initial_sync_finished`. The weird syntax is only to save - // one extra layer if indentation because it's already a bit tough for the eyes. - // Think about it like: - // ``` - // if !initial_sync_finished { - // loop { - // select! { - // ... - // } - // } - // } - //``` - while !initial_sync_finished { - select! { - msg = self.events_rx.select_next_some() => { - match msg { - Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, - Event::Stop => return self.handle_stop().await, - Event::BlockImported(block) => _ = self.block_imported(block).await, - Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => { - // Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the - // subsystems start doing work when they receive their first `ActiveLeaves` event. We need - // to ensure such event is sent no matter what. - // We can also wait for the next leaf to be activated which is safe in probably 99% of the - // cases. However if the finality is stalled for some reason and a new node is started or - // restarted its subsystems won't start without the logic here. - // - // To force an `ActiveLeaves` event first we do an artificial `block_import`. - let update = self.block_imported(block.clone()).await; - if !update.is_empty() { - // it might yield an `ActiveLeaves` update. If so - send it. - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } else { - // if not - prepare one manually. All the required state should be already available. - let update = self.prepare_initial_active_leaves(&block); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - - // Now process finalized event. It will probably yield an `ActiveLeaves` which will - // deactivate the heads activated by the previous event in this scope, so broadcast it. - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - // And finally broadcast `BlockFinalized` - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - break - }, - Event::BlockFinalized(block) => _ = self.block_finalized(&block).await, - Event::ExternalRequest(request) => self.handle_external_request(request) - } - }, - msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), - res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await - } - } - // main loop loop { select! { @@ -856,17 +801,64 @@ where Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, Event::Stop => return self.handle_stop().await, Event::BlockImported(block) => { - let update = self.block_imported(block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + match main_loop_state { + MainLoopState::Syncing => { + self.block_imported(block).await; + }, + MainLoopState::Running => { + let update = self.block_imported(block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + } } + }, Event::BlockFinalized(block) => { - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + match main_loop_state { + MainLoopState::Syncing => { + if self.sync_oracle.finished_syncing() { + // Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the + // subsystems start doing work when they receive their first `ActiveLeaves` event. We need + // to ensure such event is sent no matter what. + // We can also wait for the next leaf to be activated which is safe in probably 99% of the + // cases. However if the finality is stalled for some reason and a new node is started or + // restarted its subsystems won't start without the logic here. + // + // To force an `ActiveLeaves` event first we do an artificial `block_import`. + let update = self.block_imported(block.clone()).await; + if !update.is_empty() { + // it might yield an `ActiveLeaves` update. If so - send it. + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } else { + // if not - prepare one manually. All the required state should be already available. + let update = self.prepare_initial_active_leaves(&block); + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + + + // Now process finalized event. It will probably yield an `ActiveLeaves` which will + // deactivate the heads activated by the previous event in this scope, so broadcast it. + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + + // And finally broadcast `BlockFinalized` + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + main_loop_state = MainLoopState::Running; + } else { + self.block_finalized(&block).await; + } + }, + MainLoopState::Running => { + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + } } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; }, Event::ExternalRequest(request) => self.handle_external_request(request) } diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 31664b88928b..59d38cf555f0 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -754,6 +754,12 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, })), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: finalized_block.hash, + number: finalized_block.number, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + })), OverseerSignal::BlockFinalized(finalized_block.hash, 1), ]; @@ -1261,7 +1267,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { // send a signal to each subsystem handle - .block_imported(BlockInfo { + .block_finalized(BlockInfo { hash: Default::default(), parent_hash: Default::default(), number: Default::default(), @@ -1345,7 +1351,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { let res = overseer_fut.await; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS * 2); assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED); assert!(res.is_ok()); From 809a612af7f1c09d378e44bbc11f4d364ef2c06d Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 15 Feb 2023 15:58:40 +0200 Subject: [PATCH 18/19] Don't send `ActiveLeaves` on stratup; do it when the first fresh leaf is imported --- node/overseer/src/lib.rs | 129 ++++++------------------------- node/overseer/src/tests.rs | 154 ++++++++----------------------------- 2 files changed, 54 insertions(+), 229 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index c4f6256e0e4e..afe6ded73df7 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -356,16 +356,16 @@ impl MajorSyncOracle { } /// Check if node is in major sync - pub fn finished_syncing(&mut self) -> bool { + pub fn is_syncing(&mut self) -> bool { match &mut self.state { OracleState::Syncing(sync_oracle) => - if !sync_oracle.is_major_syncing() { - self.state = OracleState::Done; + if sync_oracle.is_major_syncing() { true } else { + self.state = OracleState::Done; false }, - OracleState::Done => true, + OracleState::Done => false, } } } @@ -765,32 +765,13 @@ where } async fn run_inner(mut self) -> SubsystemResult<()> { - enum MainLoopState { - Syncing, - Running, - } - let mut main_loop_state = MainLoopState::Syncing; - let metrics = self.metrics.clone(); spawn_metronome_metrics(&mut self, metrics)?; - let initial_sync_finished = self.sync_oracle.finished_syncing(); - // Import the active leaves found in the database + // Import the active leaves found in the database but don't broadcast ActiveLeaves for them for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); - if let Some((span, status)) = self.on_head_activated(&hash, None).await { - if initial_sync_finished { - // Initial sync is complete. Notify the subsystems and proceed to the main loop - let update = ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash, - number, - status, - span, - }); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - main_loop_state = MainLoopState::Running; - } - } + self.on_head_activated(&hash, None).await; } // main loop @@ -800,66 +781,28 @@ where match msg { Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, Event::Stop => return self.handle_stop().await, + Event::BlockImported(block) if self.sync_oracle.is_syncing() => { + // If we are syncing - don't broadcast ActiveLeaves + self.block_imported(block).await; + }, Event::BlockImported(block) => { - match main_loop_state { - MainLoopState::Syncing => { - self.block_imported(block).await; - }, - MainLoopState::Running => { - let update = self.block_imported(block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - } + // Syncing is complete - broadcast ActiveLeaves + let update = self.block_imported(block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - + }, + Event::BlockFinalized(block) if self.sync_oracle.is_syncing() => { + // Still syncing - just import the block and don't broadcast anything + self.block_finalized(&block).await; }, Event::BlockFinalized(block) => { - match main_loop_state { - MainLoopState::Syncing => { - if self.sync_oracle.finished_syncing() { - // Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the - // subsystems start doing work when they receive their first `ActiveLeaves` event. We need - // to ensure such event is sent no matter what. - // We can also wait for the next leaf to be activated which is safe in probably 99% of the - // cases. However if the finality is stalled for some reason and a new node is started or - // restarted its subsystems won't start without the logic here. - // - // To force an `ActiveLeaves` event first we do an artificial `block_import`. - let update = self.block_imported(block.clone()).await; - if !update.is_empty() { - // it might yield an `ActiveLeaves` update. If so - send it. - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } else { - // if not - prepare one manually. All the required state should be already available. - let update = self.prepare_initial_active_leaves(&block); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - - // Now process finalized event. It will probably yield an `ActiveLeaves` which will - // deactivate the heads activated by the previous event in this scope, so broadcast it. - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - - // And finally broadcast `BlockFinalized` - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - main_loop_state = MainLoopState::Running; - } else { - self.block_finalized(&block).await; - } - }, - MainLoopState::Running => { - let update = self.block_finalized(&block).await; - if !update.is_empty() { - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - } - self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; - } + let update = self.block_finalized(&block).await; + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; } - }, + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; + } Event::ExternalRequest(request) => self.handle_external_request(request) } }, @@ -869,32 +812,6 @@ where } } - fn prepare_initial_active_leaves(&self, block: &BlockInfo) -> ActiveLeavesUpdate { - // In theory we can receive `BlockImported` during initial major sync. In this case the - // update will be empty. - let span = match self.span_per_active_leaf.get(&block.hash) { - Some(span) => span.clone(), - None => { - // This should never happen. - gum::warn!( - target: LOG_TARGET, - ?block.hash, - ?block.number, - "Span for initial active leaf not found. This is not expected" - ); - let span = Arc::new(jaeger::Span::new(block.hash, "leaf-activated")); - span - }, - }; - - ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: block.hash, - number: block.number, - status: LeafStatus::Fresh, - span, - }) - } - async fn handle_stop(self) -> Result<(), SubsystemError> { self.stop().await; Ok(()) diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 59d38cf555f0..3f5edc14ac13 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -447,12 +447,6 @@ fn overseer_start_stop_works() { handle.block_imported(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: Some(ActivatedLeaf { hash: second_block_hash, @@ -552,18 +546,6 @@ fn overseer_finalize_works() { handle.block_finalized(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 2, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [first_block_hash, second_block_hash].as_ref().into(), ..Default::default() @@ -655,18 +637,6 @@ fn overseer_finalize_leaf_preserves_it() { handle.block_finalized(first_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: first_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: second_block_hash, - number: 1, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { deactivated: [second_block_hash].as_ref().into(), ..Default::default() @@ -754,12 +724,6 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, })), - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: finalized_block.hash, - number: finalized_block.number, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), OverseerSignal::BlockFinalized(finalized_block.hash, 1), ]; @@ -892,85 +856,6 @@ fn do_not_send_events_before_initial_major_sync_is_complete() { }); } -#[test] -fn active_leaves_is_sent_on_stalled_finality() { - let spawner = sp_core::testing::TaskExecutor::new(); - let (tx_5, mut rx_5) = metered::channel(64); - let is_syncing = std::sync::Arc::new(std::sync::Mutex::new(true)); - - // Prepare overseer - let (overseer, handle) = dummy_overseer_builder( - spawner, - MockSupportsParachains, - None, - MajorSyncOracle::new(Box::new(PuppetOracle { state: is_syncing.clone() })), - Vec::new(), - ) - .unwrap() - .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) - .build() - .unwrap(); - - // Prepare overseer future - let mut handle = Handle::new(handle); - let overseer_fut = overseer.run_inner().fuse(); - pin_mut!(overseer_fut); - - // The test logic. It will run concurrently with the overseer. - // The test simulates a node (re)started during a finality stall. Overseer starts, receives one - // `block_finalized` event. This is equivalent to starting the node and syncing state. However - // after that no `block_imported` events are received. Despite that overseer should generate one - // `ActiveLeaves` event for the last finalized block and broadcast it to the subsystems - let test = async move { - let finalized_block_after_sync = - BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 1 }; - - let mut ss5_results = Vec::new(); - - // Switch the oracle state - initial sync is completed - *is_syncing.lock().unwrap() = false; - - // Generate 'block imported' event which simulates end of sync - handle.block_finalized(finalized_block_after_sync.clone()).await; - - let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: finalized_block_after_sync.hash, - number: finalized_block_after_sync.number, - span: Arc::new(jaeger::Span::Disabled), - status: LeafStatus::Fresh, - })), - OverseerSignal::BlockFinalized( - finalized_block_after_sync.hash, - finalized_block_after_sync.number, - ), - ]; - - loop { - let res = rx_5.next().timeout(Duration::from_millis(100)).await; - assert_matches!(res, Some(res) => { - if let Some(res) = dbg!(res) { - ss5_results.push(res); - } - }); - - if ss5_results.len() == expected_heartbeats.len() { - handle.stop().await; - break - } - } - - assert_eq!(ss5_results.len(), expected_heartbeats.len()); - - for expected in expected_heartbeats { - assert!(ss5_results.contains(&expected)); - } - }; - - let (res, _) = executor::block_on(futures::future::join(overseer_fut, test)); - assert_matches!(res, Ok(())); -} - #[test] fn initial_leaves_are_sent_if_nothing_to_sync() { let spawner = sp_core::testing::TaskExecutor::new(); @@ -999,16 +884,39 @@ fn initial_leaves_are_sent_if_nothing_to_sync() { let overseer_fut = overseer.run_inner().fuse(); pin_mut!(overseer_fut); + // Run overseer for 500ms to check if any events are generated + loop { + select! { + _ = overseer_fut => { + assert!(false, "Overseer should not exit"); + }, + res = rx_5.next().timeout(Duration::from_millis(500)).fuse() => { + assert_matches!(res, None); + break; + } + } + } + + // Now complete sync and generate an event just to be sure everything works + let imported_block_after_sync = + BlockInfo { hash: Hash::random(), parent_hash: Hash::random(), number: 2 }; + *is_syncing.lock().unwrap() = false; + + // Finalize one of the leaves from db and import one new + handle.block_finalized(db_leaves.clone()).await; + handle.block_imported(imported_block_after_sync.clone()).await; + let mut ss5_results = Vec::new(); - // Generate two more events - these must not be ignore - let expected_heartbeats = - vec![OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: db_leaves.hash, - number: db_leaves.number, + let expected_heartbeats = vec![ + OverseerSignal::BlockFinalized(db_leaves.hash, db_leaves.number), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: imported_block_after_sync.hash, + number: imported_block_after_sync.number, span: Arc::new(jaeger::Span::Disabled), status: LeafStatus::Fresh, - }))]; + })), + ]; loop { select! { @@ -1267,7 +1175,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { // send a signal to each subsystem handle - .block_finalized(BlockInfo { + .block_imported(BlockInfo { hash: Default::default(), parent_hash: Default::default(), number: Default::default(), @@ -1351,7 +1259,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { let res = overseer_fut.await; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS * 2); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED); assert!(res.is_ok()); From 2e14f4613180a90772eda2a6212a1293d479634e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 15 Feb 2023 16:35:57 +0200 Subject: [PATCH 19/19] Undo some method extractions --- node/overseer/src/lib.rs | 77 ++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 46 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index afe6ded73df7..dd441753a53e 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -331,7 +331,7 @@ pub async fn forward_events>(client: Arc

, mut hand } } -///Represents the internal state of `MajorSyncOracle` +/// Represents the internal state of `MajorSyncOracle` enum OracleState { Syncing(Box), Done, @@ -774,13 +774,18 @@ where self.on_head_activated(&hash, None).await; } - // main loop loop { select! { msg = self.events_rx.select_next_some() => { match msg { - Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?, - Event::Stop => return self.handle_stop().await, + Event::MsgToSubsystem { msg, origin } => { + self.route_message(msg.into(), origin).await?; + self.metrics.on_message_relayed(); + } + Event::Stop => { + self.stop().await; + return Ok(()); + }, Event::BlockImported(block) if self.sync_oracle.is_syncing() => { // If we are syncing - don't broadcast ActiveLeaves self.block_imported(block).await; @@ -803,54 +808,34 @@ where } self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?; } - Event::ExternalRequest(request) => self.handle_external_request(request) + Event::ExternalRequest(request) => { + self.handle_external_request(request); + } } }, - msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg), - res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await + msg = self.to_orchestra_rx.select_next_some() => { + match msg { + ToOrchestra::SpawnJob { name, subsystem, s } => { + self.spawn_job(name, subsystem, s); + } + ToOrchestra::SpawnBlockingJob { name, subsystem, s } => { + self.spawn_blocking_job(name, subsystem, s); + } + } + }, + res = self.running_subsystems.select_next_some() => { + gum::error!( + target: LOG_TARGET, + subsystem = ?res, + "subsystem finished unexpectedly", + ); + self.stop().await; + return res; + }, } } } - async fn handle_stop(self) -> Result<(), SubsystemError> { - self.stop().await; - Ok(()) - } - - fn handle_orchestra_rx(&mut self, msg: ToOrchestra) { - match msg { - ToOrchestra::SpawnJob { name, subsystem, s } => { - self.spawn_job(name, subsystem, s); - }, - ToOrchestra::SpawnBlockingJob { name, subsystem, s } => { - self.spawn_blocking_job(name, subsystem, s); - }, - } - } - - async fn handle_msg_to_subsystem( - &mut self, - msg: AllMessages, - origin: &'static str, - ) -> Result<(), SubsystemError> { - self.route_message(msg.into(), origin).await?; - self.metrics.on_message_relayed(); - Ok(()) - } - - async fn handle_running_subsystems( - self, - res: Result<(), SubsystemError>, - ) -> Result<(), SubsystemError> { - gum::error!( - target: LOG_TARGET, - subsystem = ?res, - "subsystem finished unexpectedly", - ); - self.stop().await; - return res - } - async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate { match self.active_leaves.entry(block.hash) { hash_map::Entry::Vacant(entry) => entry.insert(block.number),