Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
tdimitrov committed Feb 15, 2023
1 parent ade3d9d commit ba1167b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 71 deletions.
130 changes: 61 additions & 69 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ where
}

async fn run_inner(mut self) -> SubsystemResult<()> {
enum MainLoopState {
Syncing,
Running,
}
let mut main_loop_state = MainLoopState::Syncing;

let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

Expand All @@ -782,72 +788,11 @@ where
span,
});
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
main_loop_state = MainLoopState::Running;
}
}
}

// If initial sync is not complete - wait for it.
// This loop is identical to the main one but doesn't generate `ActiveLeaves` and `BlockFinalized` events until
// the initial full sync is complete.
// This is an infinite loop which executes only when `!initial_sync_finished`. The weird syntax is only to save
// one extra layer if indentation because it's already a bit tough for the eyes.
// Think about it like:
// ```
// if !initial_sync_finished {
// loop {
// select! {
// ...
// }
// }
// }
//```
while !initial_sync_finished {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Event::BlockImported(block) => _ = self.block_imported(block).await,
Event::BlockFinalized(block) if self.sync_oracle.finished_syncing() => {
// Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the
// subsystems start doing work when they receive their first `ActiveLeaves` event. We need
// to ensure such event is sent no matter what.
// We can also wait for the next leaf to be activated which is safe in probably 99% of the
// cases. However if the finality is stalled for some reason and a new node is started or
// restarted its subsystems won't start without the logic here.
//
// To force an `ActiveLeaves` event first we do an artificial `block_import`.
let update = self.block_imported(block.clone()).await;
if !update.is_empty() {
// it might yield an `ActiveLeaves` update. If so - send it.
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
} else {
// if not - prepare one manually. All the required state should be already available.
let update = self.prepare_initial_active_leaves(&block);
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}


// Now process finalized event. It will probably yield an `ActiveLeaves` which will
// deactivate the heads activated by the previous event in this scope, so broadcast it.
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

// And finally broadcast `BlockFinalized`
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
break
},
Event::BlockFinalized(block) => _ = self.block_finalized(&block).await,
Event::ExternalRequest(request) => self.handle_external_request(request)
}
},
msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg),
res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await
}
}

// main loop
loop {
select! {
Expand All @@ -856,17 +801,64 @@ where
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Event::BlockImported(block) => {
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
match main_loop_state {
MainLoopState::Syncing => {
self.block_imported(block).await;
},
MainLoopState::Running => {
let update = self.block_imported(block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}
}

},
Event::BlockFinalized(block) => {
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
match main_loop_state {
MainLoopState::Syncing => {
if self.sync_oracle.finished_syncing() {
// Initial major sync is complete so an `ActiveLeaves` needs to be broadcasted. Most of the
// subsystems start doing work when they receive their first `ActiveLeaves` event. We need
// to ensure such event is sent no matter what.
// We can also wait for the next leaf to be activated which is safe in probably 99% of the
// cases. However if the finality is stalled for some reason and a new node is started or
// restarted its subsystems won't start without the logic here.
//
// To force an `ActiveLeaves` event first we do an artificial `block_import`.
let update = self.block_imported(block.clone()).await;
if !update.is_empty() {
// it might yield an `ActiveLeaves` update. If so - send it.
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
} else {
// if not - prepare one manually. All the required state should be already available.
let update = self.prepare_initial_active_leaves(&block);
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}


// Now process finalized event. It will probably yield an `ActiveLeaves` which will
// deactivate the heads activated by the previous event in this scope, so broadcast it.
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}

// And finally broadcast `BlockFinalized`
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
main_loop_state = MainLoopState::Running;
} else {
self.block_finalized(&block).await;
}
},
MainLoopState::Running => {
let update = self.block_finalized(&block).await;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
}
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
},
Event::ExternalRequest(request) => self.handle_external_request(request)
}
Expand Down
10 changes: 8 additions & 2 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,12 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: finalized_block.hash,
number: finalized_block.number,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::BlockFinalized(finalized_block.hash, 1),
];

Expand Down Expand Up @@ -1261,7 +1267,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

// send a signal to each subsystem
handle
.block_imported(BlockInfo {
.block_finalized(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
Expand Down Expand Up @@ -1345,7 +1351,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

let res = overseer_fut.await;
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS * 2);
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED);

assert!(res.is_ok());
Expand Down

0 comments on commit ba1167b

Please sign in to comment.