Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adds start and stop work heartbeats. #1188

Merged
merged 5 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../primitives" }

[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
Expand Down
1 change: 1 addition & 0 deletions overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ fn main() {
});

let (overseer, _handler) = Overseer::new(
&[],
Box::new(Subsystem2),
Box::new(Subsystem1),
spawner,
Expand Down
199 changes: 189 additions & 10 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use std::fmt::Debug;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use std::collections::HashSet;

use futures::channel::{mpsc, oneshot};
use futures::{
Expand All @@ -70,6 +71,8 @@ use futures::{
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};

use polkadot_primitives::Hash;

/// An error type that describes faults that may happen
///
/// These are:
Expand Down Expand Up @@ -138,7 +141,10 @@ enum ToOverseer {

/// Some event from outer world.
enum Event {
BlockImport,
BlockImport {
hash: Hash,
parent_hash: Hash,
},
BlockFinalized,
MsgToSubsystem(AllMessages),
Stop,
Expand All @@ -160,8 +166,11 @@ pub struct OverseerHandler {

impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport).await?;
pub async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport{
hash,
parent_hash,
}).await?;

Ok(())
}
Expand Down Expand Up @@ -222,12 +231,12 @@ pub struct SubsystemContext<M: Debug>{
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
#[derive(PartialEq, Clone, Debug)]
pub enum OverseerSignal {
/// `Subsystem` should start working.
StartWork,
StartWork(Hash),
/// `Subsystem` should stop working.
StopWork,
StopWork(Hash),
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}
Expand Down Expand Up @@ -366,6 +375,14 @@ pub struct Overseer<S: Spawn> {

/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,

/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
leaves: Vec<Hash>,

/// The set of the "active leaves".
active_leaves: HashSet<Hash>,
}

impl<S> Overseer<S>
Expand Down Expand Up @@ -452,6 +469,7 @@ where
/// # fn main() { executor::block_on(async move {
/// let spawner = executor::ThreadPool::new().unwrap();
/// let (overseer, _handler) = Overseer::new(
/// &[],
/// Box::new(ValidationSubsystem),
/// Box::new(CandidateBackingSubsystem),
/// spawner,
Expand All @@ -471,6 +489,7 @@ where
/// # }); }
/// ```
pub fn new(
leaves: &[Hash],
montekki marked this conversation as resolved.
Show resolved Hide resolved
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
mut s: S,
Expand Down Expand Up @@ -498,13 +517,21 @@ where
candidate_backing,
)?;

let active_leaves = HashSet::new();

let mut v = Vec::new();
v.extend_from_slice(leaves);
let leaves = v;

let this = Self {
validation_subsystem,
candidate_backing_subsystem,
s,
running_subsystems,
running_subsystems_rx,
events_rx,
leaves,
active_leaves,
};

Ok((this, handler))
Expand Down Expand Up @@ -537,6 +564,13 @@ where

/// Run the `Overseer`.
pub async fn run(mut self) -> SubsystemResult<()> {
let leaves = std::mem::replace(&mut self.leaves, vec![]);
montekki marked this conversation as resolved.
Show resolved Hide resolved

for leaf in leaves.into_iter() {
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
self.broadcast_signal(OverseerSignal::StartWork(leaf)).await?;
self.active_leaves.insert(leaf);
}

loop {
while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
match msg {
Expand All @@ -547,7 +581,10 @@ where
self.stop().await;
return Ok(());
}
_ => ()
Event::BlockImport { hash, parent_hash } => {
self.block_imported(hash, parent_hash).await?;
}
_ => {}
}
}

Expand Down Expand Up @@ -576,6 +613,31 @@ where
}
}

async fn block_imported(&mut self, hash: Hash, parent_hash: Hash) -> SubsystemResult<()> {
if let Some(parent) = self.active_leaves.take(&parent_hash) {
self.broadcast_signal(OverseerSignal::StopWork(parent)).await?;
}

if !self.active_leaves.contains(&hash) {
self.broadcast_signal(OverseerSignal::StartWork(hash)).await?;
self.active_leaves.insert(hash);
}

Ok(())
}

async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut s) = self.validation_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}

if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal)).await?;
}

Ok(())
}

async fn route_message(&mut self, msg: AllMessages) {
match msg {
AllMessages::Validation(msg) => {
Expand All @@ -591,7 +653,6 @@ where
}
}


fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
self.s.spawn(j).map_err(|_| SubsystemError)
}
Expand Down Expand Up @@ -642,7 +703,7 @@ mod tests {
i += 1;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
_ => (),
}
Expand All @@ -668,7 +729,7 @@ mod tests {
continue;
}
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
break;
}
Ok(Some(_)) => {
Expand Down Expand Up @@ -703,6 +764,7 @@ mod tests {
let (s2_tx, mut s2_rx) = mpsc::channel(64);

let (overseer, mut handler) = Overseer::new(
&[],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem2(s2_tx)),
spawner,
Expand Down Expand Up @@ -752,6 +814,7 @@ mod tests {
executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64);
let (overseer, _handle) = Overseer::new(
&[],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem4),
spawner,
Expand All @@ -765,4 +828,120 @@ mod tests {
}
})
}

struct TestSubsystem5(mpsc::Sender<OverseerSignal>);

impl Subsystem<ValidationSubsystemMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();

SpawnedSubsystem(Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOverseer::Signal(s))) => {
sender.send(s).await.unwrap();
continue;
},
Ok(Some(_)) => continue,
Err(_) => return,
_ => (),
}
pending!();
}
}))
}
}

struct TestSubsystem6(mpsc::Sender<OverseerSignal>);

impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();

SpawnedSubsystem(Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOverseer::Signal(s))) => {
sender.send(s).await.unwrap();
continue;
},
Ok(Some(_)) => continue,
Err(_) => return,
_ => (),
}
pending!();
}
}))
}
}

// Tests that starting with a defined set of leaves and receiving
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
fn overseer_start_stop_works() {
let spawner = executor::ThreadPool::new().unwrap();

executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();

let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64);

let (overseer, mut handler) = Overseer::new(
&[first_block_hash],
Box::new(TestSubsystem5(tx_5)),
Box::new(TestSubsystem6(tx_6)),
spawner,
).unwrap();

let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);

let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

handler.block_imported(second_block_hash, first_block_hash).await.unwrap();
handler.block_imported(third_block_hash, second_block_hash).await.unwrap();

let expected_heartbeats = vec![
OverseerSignal::StartWork(first_block_hash),
OverseerSignal::StopWork(first_block_hash),
OverseerSignal::StartWork(second_block_hash),
OverseerSignal::StopWork(second_block_hash),
OverseerSignal::StartWork(third_block_hash),
];

loop {
select! {
res = overseer_fut => {
assert!(res.is_ok());
break;
},
res = rx_5.next() => {
if let Some(res) = res {
ss5_results.push(res);
}
}
res = rx_6.next() => {
if let Some(res) = res {
ss6_results.push(res);
}
}
complete => break,
}

if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await.unwrap();
}
}

assert_eq!(ss5_results, expected_heartbeats);
assert_eq!(ss6_results, expected_heartbeats);
});
}
}