Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Undo some method extractions
Browse files Browse the repository at this point in the history
  • Loading branch information
tdimitrov committed Feb 15, 2023
1 parent 809a612 commit 2e14f46
Showing 1 changed file with 31 additions and 46 deletions.
77 changes: 31 additions & 46 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
}
}

///Represents the internal state of `MajorSyncOracle`
/// Represents the internal state of `MajorSyncOracle`
enum OracleState {
Syncing(Box<dyn SyncOracle + Send>),
Done,
Expand Down Expand Up @@ -774,13 +774,18 @@ where
self.on_head_activated(&hash, None).await;
}

// main loop
loop {
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin } => self.handle_msg_to_subsystem(msg, origin).await?,
Event::Stop => return self.handle_stop().await,
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
self.metrics.on_message_relayed();
}
Event::Stop => {
self.stop().await;
return Ok(());
},
Event::BlockImported(block) if self.sync_oracle.is_syncing() => {
// If we are syncing - don't broadcast ActiveLeaves
self.block_imported(block).await;
Expand All @@ -803,54 +808,34 @@ where
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
}
Event::ExternalRequest(request) => self.handle_external_request(request)
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
}
},
msg = self.to_orchestra_rx.select_next_some() => self.handle_orchestra_rx(msg),
res = self.running_subsystems.select_next_some() => return self.handle_running_subsystems(res).await
msg = self.to_orchestra_rx.select_next_some() => {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
res = self.running_subsystems.select_next_some() => {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res;
},
}
}
}

async fn handle_stop(self) -> Result<(), SubsystemError> {
self.stop().await;
Ok(())
}

fn handle_orchestra_rx(&mut self, msg: ToOrchestra) {
match msg {
ToOrchestra::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
},
ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
},
}
}

async fn handle_msg_to_subsystem(
&mut self,
msg: AllMessages,
origin: &'static str,
) -> Result<(), SubsystemError> {
self.route_message(msg.into(), origin).await?;
self.metrics.on_message_relayed();
Ok(())
}

async fn handle_running_subsystems(
self,
res: Result<(), SubsystemError>,
) -> Result<(), SubsystemError> {
gum::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return res
}

async fn block_imported(&mut self, block: BlockInfo) -> ActiveLeavesUpdate {
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
Expand Down

0 comments on commit 2e14f46

Please sign in to comment.