Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add task type label to task metrics #13240

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,20 @@ impl SpawnTaskHandle {
GroupName::Default => DEFAULT_GROUP_NAME,
};

let task_type_label = match task_type {
TaskType::Blocking => "blocking",
TaskType::Async => "async",
};

// Note that we increase the started counter here and not within the future. This way,
// we could properly visualize on Prometheus situations where the spawning doesn't work.
if let Some(metrics) = &self.metrics {
metrics.tasks_spawned.with_label_values(&[name, group]).inc();
metrics.tasks_spawned.with_label_values(&[name, group, task_type_label]).inc();
// We do a dummy increase in order for the task to show up in metrics.
metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc_by(0);
metrics
.tasks_ended
.with_label_values(&[name, "finished", group, task_type_label])
.inc_by(0);
}

let future = async move {
Expand All @@ -145,8 +153,10 @@ impl SpawnTaskHandle {
if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
let poll_duration = metrics.poll_duration.with_label_values(&[name, group]);
let poll_start = metrics.poll_start.with_label_values(&[name, group]);
let poll_duration =
metrics.poll_duration.with_label_values(&[name, group, task_type_label]);
let poll_start =
metrics.poll_start.with_label_values(&[name, group, task_type_label]);
let inner =
prometheus_future::with_poll_durations(poll_duration, poll_start, task);
// The logic of `AssertUnwindSafe` here is ok considering that we throw
Expand All @@ -157,15 +167,24 @@ impl SpawnTaskHandle {

match select(on_exit, task).await {
Either::Right((Err(payload), _)) => {
metrics.tasks_ended.with_label_values(&[name, "panic", group]).inc();
metrics
.tasks_ended
.with_label_values(&[name, "panic", group, task_type_label])
.inc();
panic::resume_unwind(payload)
},
Either::Right((Ok(()), _)) => {
metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc();
metrics
.tasks_ended
.with_label_values(&[name, "finished", group, task_type_label])
.inc();
},
Either::Left(((), _)) => {
// The `on_exit` has triggered.
metrics.tasks_ended.with_label_values(&[name, "interrupted", group]).inc();
metrics
.tasks_ended
.with_label_values(&[name, "interrupted", group, task_type_label])
.inc();
},
}
} else {
Expand Down Expand Up @@ -433,28 +452,28 @@ impl Metrics {
buckets: exponential_buckets(0.001, 4.0, 9)
.expect("function parameters are constant and always valid; qed"),
},
&["task_name", "task_group"]
&["task_name", "task_group", "kind"]
)?, registry)?,
poll_start: register(CounterVec::new(
Opts::new(
"substrate_tasks_polling_started_total",
"Total number of times we started invoking Future::poll"
),
&["task_name", "task_group"]
&["task_name", "task_group", "kind"]
)?, registry)?,
tasks_spawned: register(CounterVec::new(
Opts::new(
"substrate_tasks_spawned_total",
"Total number of tasks that have been spawned on the Service"
),
&["task_name", "task_group"]
&["task_name", "task_group", "kind"]
)?, registry)?,
tasks_ended: register(CounterVec::new(
Opts::new(
"substrate_tasks_ended_total",
"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
),
&["task_name", "reason", "task_group"]
&["task_name", "reason", "task_group", "kind"]
)?, registry)?,
})
}
Expand Down