From d6a3052f0434ab43e55c222c412e0cce130a5294 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 2 Jun 2020 18:57:14 +0300 Subject: [PATCH 1/5] Adds start and stop work heartbeats. --- Cargo.lock | 1 + overseer/Cargo.toml | 1 + overseer/examples/minimal-example.rs | 1 + overseer/src/lib.rs | 199 +++++++++++++++++++++++++-- 4 files changed, 192 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bd85a4c9d72..1601d530cee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3308,6 +3308,7 @@ dependencies = [ "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", + "polkadot-primitives", "streamunordered", ] diff --git a/overseer/Cargo.toml b/overseer/Cargo.toml index bcd0a8e9e529..0bef4442be4f 100644 --- a/overseer/Cargo.toml +++ b/overseer/Cargo.toml @@ -9,6 +9,7 @@ futures = "0.3.5" log = "0.4.8" futures-timer = "3.0.2" streamunordered = "0.5.1" +polkadot-primitives = { path = "../primitives" } [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/overseer/examples/minimal-example.rs b/overseer/examples/minimal-example.rs index 4dd37dbafea1..e5c7692305a9 100644 --- a/overseer/examples/minimal-example.rs +++ b/overseer/examples/minimal-example.rs @@ -111,6 +111,7 @@ fn main() { }); let (overseer, _handler) = Overseer::new( + &[], Box::new(Subsystem2), Box::new(Subsystem1), spawner, diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index f7ac6cac5079..06c41f19ea71 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -58,6 +58,7 @@ use std::fmt::Debug; use std::pin::Pin; use std::task::Poll; use std::time::Duration; +use std::collections::HashSet; use futures::channel::{mpsc, oneshot}; use futures::{ @@ -70,6 +71,8 @@ use futures::{ use futures_timer::Delay; use streamunordered::{StreamYield, StreamUnordered}; +use polkadot_primitives::Hash; + /// An error type that describes faults that may happen /// /// These are: @@ -138,7 +141,10 @@ enum ToOverseer { /// Some event from outer world. enum Event { - BlockImport, + BlockImport { + hash: Hash, + parent_hash: Hash, + }, BlockFinalized, MsgToSubsystem(AllMessages), Stop, @@ -160,8 +166,11 @@ pub struct OverseerHandler { impl OverseerHandler { /// Inform the `Overseer` that that some block was imported. - pub async fn block_imported(&mut self) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockImport).await?; + pub async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockImport{ + hash, + parent_hash, + }).await?; Ok(()) } @@ -222,12 +231,12 @@ pub struct SubsystemContext{ /// /// [`Overseer`]: struct.Overseer.html /// [`Subsystem`]: trait.Subsystem.html -#[derive(Debug)] +#[derive(PartialEq, Clone, Debug)] pub enum OverseerSignal { /// `Subsystem` should start working. - StartWork, + StartWork(Hash), /// `Subsystem` should stop working. - StopWork, + StopWork(Hash), /// Conclude the work of the `Overseer` and all `Subsystem`s. Conclude, } @@ -366,6 +375,14 @@ pub struct Overseer { /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, + + /// A set of leaves that `Overseer` starts working with. + /// + /// Drained at the beginning of `run` and never used again. + leaves: Vec, + + /// The set of the "active leaves". + active_leaves: HashSet, } impl Overseer @@ -452,6 +469,7 @@ where /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); /// let (overseer, _handler) = Overseer::new( + /// &[], /// Box::new(ValidationSubsystem), /// Box::new(CandidateBackingSubsystem), /// spawner, @@ -471,6 +489,7 @@ where /// # }); } /// ``` pub fn new( + leaves: &[Hash], validation: Box + Send>, candidate_backing: Box + Send>, mut s: S, @@ -498,6 +517,12 @@ where candidate_backing, )?; + let active_leaves = HashSet::new(); + + let mut v = Vec::new(); + v.extend_from_slice(leaves); + let leaves = v; + let this = Self { validation_subsystem, candidate_backing_subsystem, @@ -505,6 +530,8 @@ where running_subsystems, running_subsystems_rx, events_rx, + leaves, + active_leaves, }; Ok((this, handler)) @@ -537,6 +564,13 @@ where /// Run the `Overseer`. pub async fn run(mut self) -> SubsystemResult<()> { + let leaves = std::mem::replace(&mut self.leaves, vec![]); + + for leaf in leaves.into_iter() { + self.broadcast_signal(OverseerSignal::StartWork(leaf)).await?; + self.active_leaves.insert(leaf); + } + loop { while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { match msg { @@ -547,7 +581,10 @@ where self.stop().await; return Ok(()); } - _ => () + Event::BlockImport { hash, parent_hash } => { + self.block_imported(hash, parent_hash).await?; + } + _ => {} } } @@ -576,6 +613,31 @@ where } } + async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> { + if let Some(parent) = self.active_leaves.take(&parent_hash) { + self.broadcast_signal(OverseerSignal::StopWork(parent)).await?; + } + + if !self.active_leaves.contains(&hash) { + self.broadcast_signal(OverseerSignal::StartWork(hash)).await?; + self.active_leaves.insert(hash); + } + + Ok(()) + } + + async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { + if let Some(ref mut s) = self.validation_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal)).await?; + } + + Ok(()) + } + async fn route_message(&mut self, msg: AllMessages) { match msg { AllMessages::Validation(msg) => { @@ -591,7 +653,6 @@ where } } - fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> { self.s.spawn(j).map_err(|_| SubsystemError) } @@ -642,7 +703,7 @@ mod tests { i += 1; continue; } - Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return, + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, Err(_) => return, _ => (), } @@ -668,7 +729,7 @@ mod tests { continue; } match ctx.try_recv().await { - Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => { break; } Ok(Some(_)) => { @@ -703,6 +764,7 @@ mod tests { let (s2_tx, mut s2_rx) = mpsc::channel(64); let (overseer, mut handler) = Overseer::new( + &[], Box::new(TestSubsystem1(s1_tx)), Box::new(TestSubsystem2(s2_tx)), spawner, @@ -752,6 +814,7 @@ mod tests { executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); let (overseer, _handle) = Overseer::new( + &[], Box::new(TestSubsystem1(s1_tx)), Box::new(TestSubsystem4), spawner, @@ -765,4 +828,120 @@ mod tests { } }) } + + struct TestSubsystem5(mpsc::Sender); + + impl Subsystem for TestSubsystem5 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + let mut sender = self.0.clone(); + + SpawnedSubsystem(Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, + Ok(Some(FromOverseer::Signal(s))) => { + sender.send(s).await.unwrap(); + continue; + }, + Ok(Some(_)) => continue, + Err(_) => return, + _ => (), + } + pending!(); + } + })) + } + } + + struct TestSubsystem6(mpsc::Sender); + + impl Subsystem for TestSubsystem6 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + let mut sender = self.0.clone(); + + SpawnedSubsystem(Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, + Ok(Some(FromOverseer::Signal(s))) => { + sender.send(s).await.unwrap(); + continue; + }, + Ok(Some(_)) => continue, + Err(_) => return, + _ => (), + } + pending!(); + } + })) + } + } + + // Tests that starting with a defined set of leaves and receiving + // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. + #[test] + fn overseer_start_stop_works() { + let spawner = executor::ThreadPool::new().unwrap(); + + executor::block_on(async move { + let first_block_hash = [1; 32].into(); + let second_block_hash = [2; 32].into(); + let third_block_hash = [3; 32].into(); + + let (tx_5, mut rx_5) = mpsc::channel(64); + let (tx_6, mut rx_6) = mpsc::channel(64); + + let (overseer, mut handler) = Overseer::new( + &[first_block_hash], + Box::new(TestSubsystem5(tx_5)), + Box::new(TestSubsystem6(tx_6)), + spawner, + ).unwrap(); + + let overseer_fut = overseer.run().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + let mut ss6_results = Vec::new(); + + handler.block_imported(second_block_hash, first_block_hash).await.unwrap(); + handler.block_imported(third_block_hash, second_block_hash).await.unwrap(); + + let expected_heartbeats = vec![ + OverseerSignal::StartWork(first_block_hash), + OverseerSignal::StopWork(first_block_hash), + OverseerSignal::StartWork(second_block_hash), + OverseerSignal::StopWork(second_block_hash), + OverseerSignal::StartWork(third_block_hash), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = res { + ss5_results.push(res); + } + } + res = rx_6.next() => { + if let Some(res) = res { + ss6_results.push(res); + } + } + complete => break, + } + + if ss5_results.len() == expected_heartbeats.len() && + ss6_results.len() == expected_heartbeats.len() { + handler.stop().await.unwrap(); + } + } + + assert_eq!(ss5_results, expected_heartbeats); + assert_eq!(ss6_results, expected_heartbeats); + }); + } } From 5fbfc39c85ee0941d81dcefaf598b8b70e0e6aef Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 2 Jun 2020 19:57:29 +0300 Subject: [PATCH 2/5] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- overseer/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 06c41f19ea71..5697d6ae0866 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -489,7 +489,7 @@ where /// # }); } /// ``` pub fn new( - leaves: &[Hash], + leaves: impl Into>, validation: Box + Send>, candidate_backing: Box + Send>, mut s: S, @@ -564,7 +564,7 @@ where /// Run the `Overseer`. pub async fn run(mut self) -> SubsystemResult<()> { - let leaves = std::mem::replace(&mut self.leaves, vec![]); + let leaves = std::mem::take(&mut self.leaves); for leaf in leaves.into_iter() { self.broadcast_signal(OverseerSignal::StartWork(leaf)).await?; From 4f220685068a53066e2d665efc1616278e68ceec Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 2 Jun 2020 20:02:55 +0300 Subject: [PATCH 3/5] Fix code after suggested changes --- overseer/examples/minimal-example.rs | 2 +- overseer/src/lib.rs | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/overseer/examples/minimal-example.rs b/overseer/examples/minimal-example.rs index e5c7692305a9..4f5f3bdaff88 100644 --- a/overseer/examples/minimal-example.rs +++ b/overseer/examples/minimal-example.rs @@ -111,7 +111,7 @@ fn main() { }); let (overseer, _handler) = Overseer::new( - &[], + vec![], Box::new(Subsystem2), Box::new(Subsystem1), spawner, diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 5697d6ae0866..1ad498f648b5 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -469,7 +469,7 @@ where /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); /// let (overseer, _handler) = Overseer::new( - /// &[], + /// vec![], /// Box::new(ValidationSubsystem), /// Box::new(CandidateBackingSubsystem), /// spawner, @@ -519,9 +519,7 @@ where let active_leaves = HashSet::new(); - let mut v = Vec::new(); - v.extend_from_slice(leaves); - let leaves = v; + let leaves = leaves.into(); let this = Self { validation_subsystem, @@ -764,7 +762,7 @@ mod tests { let (s2_tx, mut s2_rx) = mpsc::channel(64); let (overseer, mut handler) = Overseer::new( - &[], + vec![], Box::new(TestSubsystem1(s1_tx)), Box::new(TestSubsystem2(s2_tx)), spawner, @@ -814,7 +812,7 @@ mod tests { executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); let (overseer, _handle) = Overseer::new( - &[], + vec![], Box::new(TestSubsystem1(s1_tx)), Box::new(TestSubsystem4), spawner, @@ -892,7 +890,7 @@ mod tests { let (tx_6, mut rx_6) = mpsc::channel(64); let (overseer, mut handler) = Overseer::new( - &[first_block_hash], + vec![first_block_hash], Box::new(TestSubsystem5(tx_5)), Box::new(TestSubsystem6(tx_6)), spawner, From 5bee3d740ba3f02f5b8aa6f9711341d9715eaca3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 2 Jun 2020 23:10:11 +0300 Subject: [PATCH 4/5] Finalizing stops work on earlier lower blocks. --- overseer/src/lib.rs | 194 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 164 insertions(+), 30 deletions(-) diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 1ad498f648b5..4dd9e2d298ab 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -71,7 +71,7 @@ use futures::{ use futures_timer::Delay; use streamunordered::{StreamYield, StreamUnordered}; -use polkadot_primitives::Hash; +use polkadot_primitives::{BlockNumber, Hash}; /// An error type that describes faults that may happen /// @@ -139,13 +139,25 @@ enum ToOverseer { }, } +/// An event telling the `Overseer` on the particular block +/// that has been imported or finalized. +/// +/// This structure exists solely for the purposes of decoupling +/// `Overseer` code from the client code and the necessity to call +/// `HeaderBackend::block_number_from_id()`. +pub struct BlockInfo { + /// hash of the block. + pub hash: Hash, + /// hash of the parent block. + pub parent_hash: Hash, + /// block's number. + pub number: BlockNumber, +} + /// Some event from outer world. enum Event { - BlockImport { - hash: Hash, - parent_hash: Hash, - }, - BlockFinalized, + BlockImported(BlockInfo), + BlockFinalized(BlockInfo), MsgToSubsystem(AllMessages), Stop, } @@ -166,11 +178,8 @@ pub struct OverseerHandler { impl OverseerHandler { /// Inform the `Overseer` that that some block was imported. - pub async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockImport{ - hash, - parent_hash, - }).await?; + pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockImported(block)).await?; Ok(()) } @@ -183,8 +192,8 @@ impl OverseerHandler { } /// Inform the `Overseer` that that some block was finalized. - pub async fn block_finalized(&mut self) -> SubsystemResult<()> { - self.events_tx.send(Event::BlockFinalized).await?; + pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockFinalized(block)).await?; Ok(()) } @@ -379,10 +388,10 @@ pub struct Overseer { /// A set of leaves that `Overseer` starts working with. /// /// Drained at the beginning of `run` and never used again. - leaves: Vec, + leaves: Vec<(Hash, BlockNumber)>, /// The set of the "active leaves". - active_leaves: HashSet, + active_leaves: HashSet<(Hash, BlockNumber)>, } impl Overseer @@ -489,7 +498,7 @@ where /// # }); } /// ``` pub fn new( - leaves: impl Into>, + leaves: impl Into>, validation: Box + Send>, candidate_backing: Box + Send>, mut s: S, @@ -519,7 +528,10 @@ where let active_leaves = HashSet::new(); - let leaves = leaves.into(); + let leaves = leaves.into() + .into_iter() + .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) + .collect(); let this = Self { validation_subsystem, @@ -565,7 +577,7 @@ where let leaves = std::mem::take(&mut self.leaves); for leaf in leaves.into_iter() { - self.broadcast_signal(OverseerSignal::StartWork(leaf)).await?; + self.broadcast_signal(OverseerSignal::StartWork(leaf.0)).await?; self.active_leaves.insert(leaf); } @@ -579,10 +591,12 @@ where self.stop().await; return Ok(()); } - Event::BlockImport { hash, parent_hash } => { - self.block_imported(hash, parent_hash).await?; + Event::BlockImported(block) => { + self.block_imported(block).await?; + } + Event::BlockFinalized(block) => { + self.block_finalized(block).await?; } - _ => {} } } @@ -611,14 +625,34 @@ where } } - async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> { - if let Some(parent) = self.active_leaves.take(&parent_hash) { - self.broadcast_signal(OverseerSignal::StopWork(parent)).await?; + async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) { + self.broadcast_signal(OverseerSignal::StopWork(parent.0)).await?; } - if !self.active_leaves.contains(&hash) { - self.broadcast_signal(OverseerSignal::StartWork(hash)).await?; - self.active_leaves.insert(hash); + if !self.active_leaves.contains(&(block.hash, block.number)) { + self.broadcast_signal(OverseerSignal::StartWork(block.hash)).await?; + self.active_leaves.insert((block.hash, block.number)); + } + + Ok(()) + } + + async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { + let mut stop_these = Vec::new(); + + self.active_leaves.retain(|(h, n)| { + if *n <= block.number { + stop_these.push(*h); + false + } else { + true + } + }); + + + for hash in stop_these.into_iter() { + self.broadcast_signal(OverseerSignal::StopWork(hash)).await? } Ok(()) @@ -886,11 +920,27 @@ mod tests { let second_block_hash = [2; 32].into(); let third_block_hash = [3; 32].into(); + let first_block = BlockInfo { + hash: first_block_hash, + parent_hash: [0; 32].into(), + number: 1, + }; + let second_block = BlockInfo { + hash: second_block_hash, + parent_hash: first_block_hash, + number: 2, + }; + let third_block = BlockInfo { + hash: third_block_hash, + parent_hash: second_block_hash, + number: 3, + }; + let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); let (overseer, mut handler) = Overseer::new( - vec![first_block_hash], + vec![first_block], Box::new(TestSubsystem5(tx_5)), Box::new(TestSubsystem6(tx_6)), spawner, @@ -902,8 +952,8 @@ mod tests { let mut ss5_results = Vec::new(); let mut ss6_results = Vec::new(); - handler.block_imported(second_block_hash, first_block_hash).await.unwrap(); - handler.block_imported(third_block_hash, second_block_hash).await.unwrap(); + handler.block_imported(second_block).await.unwrap(); + handler.block_imported(third_block).await.unwrap(); let expected_heartbeats = vec![ OverseerSignal::StartWork(first_block_hash), @@ -942,4 +992,88 @@ mod tests { assert_eq!(ss6_results, expected_heartbeats); }); } + + // Tests that starting with a defined set of leaves and receiving + // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. + #[test] + fn overseer_finalize_works() { + let spawner = executor::ThreadPool::new().unwrap(); + + executor::block_on(async move { + let first_block_hash = [1; 32].into(); + let second_block_hash = [2; 32].into(); + let third_block_hash = [3; 32].into(); + + let first_block = BlockInfo { + hash: first_block_hash, + parent_hash: [0; 32].into(), + number: 1, + }; + let second_block = BlockInfo { + hash: second_block_hash, + parent_hash: [42; 32].into(), + number: 2, + }; + let third_block = BlockInfo { + hash: third_block_hash, + parent_hash: second_block_hash, + number: 3, + }; + + let (tx_5, mut rx_5) = mpsc::channel(64); + let (tx_6, mut rx_6) = mpsc::channel(64); + + // start with two forks of different height. + let (overseer, mut handler) = Overseer::new( + vec![first_block, second_block], + Box::new(TestSubsystem5(tx_5)), + Box::new(TestSubsystem6(tx_6)), + spawner, + ).unwrap(); + + let overseer_fut = overseer.run().fuse(); + pin_mut!(overseer_fut); + + let mut ss5_results = Vec::new(); + let mut ss6_results = Vec::new(); + + // this should stop work on both forks we started with earlier. + handler.block_finalized(third_block).await.unwrap(); + + let expected_heartbeats = vec![ + OverseerSignal::StartWork(first_block_hash), + OverseerSignal::StartWork(second_block_hash), + OverseerSignal::StopWork(first_block_hash), + OverseerSignal::StopWork(second_block_hash), + ]; + + loop { + select! { + res = overseer_fut => { + assert!(res.is_ok()); + break; + }, + res = rx_5.next() => { + if let Some(res) = res { + ss5_results.push(res); + } + } + res = rx_6.next() => { + if let Some(res) = res { + ss6_results.push(res); + } + } + complete => break, + } + + if ss5_results.len() == expected_heartbeats.len() && + ss6_results.len() == expected_heartbeats.len() { + handler.stop().await.unwrap(); + } + } + + assert_eq!(ss5_results, expected_heartbeats); + assert_eq!(ss6_results, expected_heartbeats); + }); + } } From 26c7a12eb2445230171d58edc1607216e1e79066 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 3 Jun 2020 00:19:44 +0300 Subject: [PATCH 5/5] Fix func parameter and flaky test --- overseer/src/lib.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 4dd9e2d298ab..7f0849fcb592 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -498,7 +498,7 @@ where /// # }); } /// ``` pub fn new( - leaves: impl Into>, + leaves: impl IntoIterator, validation: Box + Send>, candidate_backing: Box + Send>, mut s: S, @@ -528,7 +528,7 @@ where let active_leaves = HashSet::new(); - let leaves = leaves.into() + let leaves = leaves .into_iter() .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) .collect(); @@ -650,7 +650,6 @@ where } }); - for hash in stop_these.into_iter() { self.broadcast_signal(OverseerSignal::StopWork(hash)).await? } @@ -1072,8 +1071,15 @@ mod tests { } } - assert_eq!(ss5_results, expected_heartbeats); - assert_eq!(ss6_results, expected_heartbeats); + assert_eq!(ss5_results.len(), expected_heartbeats.len()); + assert_eq!(ss6_results.len(), expected_heartbeats.len()); + + // Notifications on finality for multiple blocks at once + // may be received in different orders. + for expected in expected_heartbeats { + assert!(ss5_results.contains(&expected)); + assert!(ss6_results.contains(&expected)); + } }); } }