Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add group name in task metrics #10196

Merged
merged 18 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn new_partial(
let client = Arc::new(client);

let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});

Expand Down Expand Up @@ -289,7 +289,9 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>

// the AURA authoring task is considered essential, i.e. if it
// fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking("aura", aura);
task_manager
.spawn_essential_handle()
.spawn_blocking("aura", Some("block-authoring"), aura);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -329,6 +331,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
// if it fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?,
);
}
Expand Down
24 changes: 16 additions & 8 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub fn new_partial(
let client = Arc::new(client);

let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});

Expand Down Expand Up @@ -434,7 +434,11 @@ pub fn new_full_base(
};

let babe = sc_consensus_babe::start_babe(babe_config)?;
task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe);
task_manager.spawn_essential_handle().spawn_blocking(
"babe-proposer",
Some("block-authoring"),
babe,
);
}

// Spawn authority discovery module.
Expand All @@ -461,9 +465,11 @@ pub fn new_full_base(
prometheus_registry.clone(),
);

task_manager
.spawn_handle()
.spawn("authority-discovery-worker", authority_discovery_worker.run());
task_manager.spawn_handle().spawn(
"authority-discovery-worker",
Some("networking"),
authority_discovery_worker.run(),
);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -501,9 +507,11 @@ pub fn new_full_base(

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
task_manager
.spawn_essential_handle()
.spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?);
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
grandpa::run_grandpa_voter(grandpa_config)?,
);
}

network_starter.start_network();
Expand Down
14 changes: 12 additions & 2 deletions bin/node/testing/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,21 @@ impl TaskExecutor {
}

impl SpawnNamed for TaskExecutor {
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}

fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_: &'static str,
_: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.pool.spawn_ok(future);
}
}
Expand Down
1 change: 1 addition & 0 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ where

spawn_handle.spawn_blocking(
"basic-authorship-proposer",
None,
Box::pin(async move {
// leave some time for evaluation and block finalization (33%)
let deadline = (self.now)() + max_duration - max_duration / 3;
Expand Down
6 changes: 5 additions & 1 deletion client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
metrics,
);

spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed());
spawner.spawn_essential_blocking(
"basic-block-import-worker",
Some("block-import"),
future.boxed(),
);

Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
}
Expand Down
1 change: 1 addition & 0 deletions client/executor/src/native_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn {
let scheduler = self.scheduler.clone();
self.scheduler.spawn(
"executor-extra-runtime-instance",
None,
Box::pin(async move {
let module = AssertUnwindSafe(module);

Expand Down
1 change: 1 addition & 0 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub async fn notification_future<Client, Block, Spawner>(
if n.is_new_best {
spawner.spawn(
"offchain-on-block",
Some("offchain-worker"),
offchain
.on_block_imported(&n.header, network_provider.clone(), is_validator)
.boxed(),
Expand Down
3 changes: 2 additions & 1 deletion client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl SubscriptionTaskExecutor {

impl Spawn for SubscriptionTaskExecutor {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed());
self.0
.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(())
}

Expand Down
20 changes: 13 additions & 7 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ where
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
Some("offchain-worker"),
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
Expand Down Expand Up @@ -505,11 +506,13 @@ where
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);

spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()),
);

Expand All @@ -520,6 +523,7 @@ where
let metrics = MetricsService::with_prometheus(telemetry.clone(), &registry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
None,
prometheus_endpoint::init_prometheus(port, registry).map(drop),
);

Expand All @@ -531,6 +535,7 @@ where
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn(
"telemetry-periodic-send",
None,
metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()),
);

Expand Down Expand Up @@ -567,6 +572,7 @@ where
// Spawn informant task
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network.clone(),
Expand Down Expand Up @@ -798,7 +804,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block_request_handler", handler.run());
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -815,7 +821,7 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("state_request_handler", handler.run());
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -828,7 +834,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
spawn_handle.spawn("warp_sync_request_handler", handler.run());
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
protocol_config
};
(provider, protocol_config)
Expand All @@ -842,7 +848,7 @@ where
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
LightClientRequestHandler::new(&protocol_id, client.clone());
spawn_handle.spawn("light_client_request_handler", handler.run());
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
protocol_config
}
};
Expand All @@ -852,13 +858,13 @@ where
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
}))
},
transactions_handler_executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("network-transactions-handler", fut);
spawn_handle.spawn("network-transactions-handler", Some("networking"), fut);
})
},
network_config: config.network.clone(),
Expand Down Expand Up @@ -920,7 +926,7 @@ where
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", async move {
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};

const DEFAULT_PROTOCOL_ID: &str = "sup";

Expand Down
Loading