Skip to content

Commit

Permalink
Add group name in task metrics (paritytech#10196)
Browse files Browse the repository at this point in the history
* SpawnNamed: add new trait methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* Implement new methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* cargo fmt

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* SpawnNamed: add new trait methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* Implement new methods

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* cargo fmt

Signed-off-by: Andrei Sandu <sandu.andrei@gmail.com>

* New approach - spaw() group param

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update traits: SpawnNamed and SpawnNamed

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update TaskManager tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update test TaskExecutor

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix typo

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* grunt work: fix spawn() calls

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* remove old code

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo fmt - the right version

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Implement review feedback

- use Option group name in SpawnNamed methods
- switch to kebab case
- implement default group name
- add group name to some tasks

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim authored and ark0f committed Feb 27, 2023
1 parent ed8a6ac commit fab4907
Show file tree
Hide file tree
Showing 19 changed files with 232 additions and 100 deletions.
7 changes: 5 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn new_partial(
let client = Arc::new(client);

let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});

Expand Down Expand Up @@ -289,7 +289,9 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>

// the AURA authoring task is considered essential, i.e. if it
// fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking("aura", aura);
task_manager
.spawn_essential_handle()
.spawn_blocking("aura", Some("block-authoring"), aura);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -329,6 +331,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
// if it fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?,
);
}
Expand Down
24 changes: 16 additions & 8 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub fn new_partial(
let client = Arc::new(client);

let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});

Expand Down Expand Up @@ -436,7 +436,11 @@ pub fn new_full_base(
};

let babe = sc_consensus_babe::start_babe(babe_config)?;
task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe);
task_manager.spawn_essential_handle().spawn_blocking(
"babe-proposer",
Some("block-authoring"),
babe,
);
}

// Spawn authority discovery module.
Expand All @@ -463,9 +467,11 @@ pub fn new_full_base(
prometheus_registry.clone(),
);

task_manager
.spawn_handle()
.spawn("authority-discovery-worker", authority_discovery_worker.run());
task_manager.spawn_handle().spawn(
"authority-discovery-worker",
Some("networking"),
authority_discovery_worker.run(),
);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -503,9 +509,11 @@ pub fn new_full_base(

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
task_manager
.spawn_essential_handle()
.spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?);
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
grandpa::run_grandpa_voter(grandpa_config)?,
);
}

network_starter.start_network();
Expand Down
14 changes: 12 additions & 2 deletions bin/node/testing/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,21 @@ impl TaskExecutor {
}

impl SpawnNamed for TaskExecutor {
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}

fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}
}
Expand Down
1 change: 1 addition & 0 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ where

spawn_handle.spawn_blocking(
"basic-authorship-proposer",
None,
Box::pin(async move {
// leave some time for evaluation and block finalization (33%)
let deadline = (self.now)() + max_duration - max_duration / 3;
Expand Down
6 changes: 5 additions & 1 deletion client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
metrics,
);

spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed());
spawner.spawn_essential_blocking(
"basic-block-import-worker",
Some("block-import"),
future.boxed(),
);

Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
}
Expand Down
1 change: 1 addition & 0 deletions client/executor/src/native_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn {
let scheduler = self.scheduler.clone();
self.scheduler.spawn(
"executor-extra-runtime-instance",
None,
Box::pin(async move {
let module = AssertUnwindSafe(module);

Expand Down
1 change: 1 addition & 0 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub async fn notification_future<Client, Block, Spawner>(
if n.is_new_best {
spawner.spawn(
"offchain-on-block",
Some("offchain-worker"),
offchain
.on_block_imported(&n.header, network_provider.clone(), is_validator)
.boxed(),
Expand Down
3 changes: 2 additions & 1 deletion client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl SubscriptionTaskExecutor {

impl Spawn for SubscriptionTaskExecutor {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed());
self.0
.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(())
}

Expand Down
20 changes: 13 additions & 7 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ where
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
Some("offchain-worker"),
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
Expand Down Expand Up @@ -505,11 +506,13 @@ where
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);

spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()),
);

Expand All @@ -520,6 +523,7 @@ where
let metrics = MetricsService::with_prometheus(telemetry.clone(), &registry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
None,
prometheus_endpoint::init_prometheus(port, registry).map(drop),
);

Expand All @@ -531,6 +535,7 @@ where
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn(
"telemetry-periodic-send",
None,
metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()),
);

Expand Down Expand Up @@ -567,6 +572,7 @@ where
// Spawn informant task
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network.clone(),
Expand Down Expand Up @@ -798,7 +804,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block_request_handler", handler.run());
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -815,7 +821,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("state_request_handler", handler.run());
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -828,7 +834,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
spawn_handle.spawn("warp_sync_request_handler", handler.run());
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
protocol_config
};
(provider, protocol_config)
Expand All @@ -842,7 +848,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
LightClientRequestHandler::new(&protocol_id, client.clone());
spawn_handle.spawn("light_client_request_handler", handler.run());
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -852,13 +858,13 @@ where
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
}))
},
transactions_handler_executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("network-transactions-handler", fut);
spawn_handle.spawn("network-transactions-handler", Some("networking"), fut);
})
},
network_config: config.network.clone(),
Expand Down Expand Up @@ -920,7 +926,7 @@ where
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", async move {
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};

const DEFAULT_PROTOCOL_ID: &str = "sup";

Expand Down
Loading

0 comments on commit fab4907

Please sign in to comment.