Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report tracing_unbounded channel size to prometheus #1489

Merged
merged 3 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions substrate/client/utils/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use prometheus::{
Error as PrometheusError, Registry,
};

use prometheus::{core::GenericCounterVec, Opts};
use prometheus::{
core::{GenericCounterVec, GenericGaugeVec},
Opts,
};

lazy_static! {
pub static ref TOKIO_THREADS_TOTAL: GenericCounter<AtomicU64> =
Expand All @@ -36,18 +39,32 @@ lazy_static! {
}

lazy_static! {
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"),
&["entity", "action"] // 'name of channel, send|received|dropped
pub static ref UNBOUNDED_CHANNELS_COUNTER: GenericCounterVec<AtomicU64> = GenericCounterVec::new(
Opts::new(
"substrate_unbounded_channel_len",
"Items sent/received/dropped on each mpsc::unbounded instance"
),
&["entity", "action"], // name of channel, send|received|dropped
).expect("Creating of statics doesn't fail. qed");
pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure that a single gauge will provide enough data here. Since a gauge is a simple counter cell, Prometheus will get just a single value on the next collection. However, we will not track any spikes of this value between the collection intervals. I would suggest to use histogram here, as it includes both Gauge functionality and buckets that will allow to track anomalities and peaks easily. On the other hand, it is more expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go with a simple gauge here, because it's hard to estimate the performance impact of using a histogram, considering we use channels extensively. So far this is merely a fix for the instant channel size calculation, which was done on the CI side previously using metrics not intended for this. But thanks for the suggestion anyway.

Opts::new(
"substrate_unbounded_channel_size",
"Size (number of messages to be processed) of each mpsc::unbounded instance",
),
&["entity"], // name of channel
).expect("Creating of statics doesn't fail. qed");

}

pub static SENT_LABEL: &'static str = "send";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub static SENT_LABEL: &'static str = "send";
pub static SENT_LABEL: &'static str = "sent";

Copy link
Contributor Author

@dmitry-markin dmitry-markin Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original label name was "send", and I'm not sure if it's a good idea to break the compatibility — there are probably a lot of automation scripts using this.

pub static RECEIVED_LABEL: &'static str = "received";
pub static DROPPED_LABEL: &'static str = "dropped";

/// Register the statics to report to registry
pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?;

Ok(())
}
27 changes: 20 additions & 7 deletions substrate/client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

pub use async_channel::{TryRecvError, TrySendError};

use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use crate::metrics::{
DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
};
use async_channel::{Receiver, Sender};
use futures::{
stream::{FusedStream, Stream},
Expand Down Expand Up @@ -102,7 +104,10 @@ impl<T> TracingUnboundedSender<T> {
/// Proxy function to `async_channel::Sender::try_send`.
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(msg).map(|s| {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[self.name])
.set(self.inner.len().saturated_into());

if self.inner.len() >= self.queue_size_warning &&
self.warning_fired
Expand Down Expand Up @@ -140,7 +145,10 @@ impl<T> TracingUnboundedReceiver<T> {
/// that discounts the messages taken out.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.inner.try_recv().map(|s| {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[self.name])
.set(self.inner.len().saturated_into());
s
})
}
Expand All @@ -155,14 +163,16 @@ impl<T> Drop for TracingUnboundedReceiver<T> {
fn drop(&mut self) {
// Close the channel to prevent any further messages to be sent into the channel
self.close();
// the number of messages about to be dropped
// The number of messages about to be dropped
let count = self.inner.len();
// discount the messages
// Discount the messages
if count > 0 {
UNBOUNDED_CHANNELS_COUNTER
.with_label_values(&[self.name, "dropped"])
.with_label_values(&[self.name, DROPPED_LABEL])
.inc_by(count.saturated_into());
}
// Reset the size metric to 0
UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
// Drain all the pending messages in the channel since they can never be accessed,
// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
// resolved
Expand All @@ -180,7 +190,10 @@ impl<T> Stream for TracingUnboundedReceiver<T> {
match Pin::new(&mut s.inner).poll_next(cx) {
Poll::Ready(msg) => {
if msg.is_some() {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[s.name])
.set(s.inner.len().saturated_into());
}
Poll::Ready(msg)
},
Expand Down