diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 398c26793fd41..ededccd5e3233 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -34,37 +34,48 @@ use crate::event::Event; use futures::{prelude::*, ready, stream::FusedStream}; -use log::error; -use parking_lot::Mutex; +use log::{debug, error}; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use std::{ backtrace::Backtrace, cell::RefCell, fmt, pin::Pin, - sync::Arc, task::{Context, Poll}, }; +/// Log target for this file. +pub const LOG_TARGET: &str = "sub-libp2p::out_events"; + /// Creates a new channel that can be associated to a [`OutChannels`]. /// /// The name is used in Prometheus reports, the queue size threshold is used /// to warn if there are too many unprocessed events in the channel. pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { let (tx, rx) = async_channel::unbounded(); - let metrics = Arc::new(Mutex::new(None)); let tx = Sender { inner: tx, name, queue_size_warning, - warning_fired: false, + warning_fired: SenderWarningState::NotFired, creation_backtrace: Backtrace::force_capture(), - metrics: metrics.clone(), + metrics: None, }; - let rx = Receiver { inner: rx, name, metrics }; + let rx = Receiver { inner: rx, name, metrics: None }; (tx, rx) } +/// A state of a sender warning that is used to avoid spamming the logs. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SenderWarningState { + /// The warning has not been fired yet. + NotFired, + /// The warning has been fired, and the channel is full + FiredFull, + /// The warning has been fired and the channel is not full anymore. + FiredFree, +} + /// Sending side of a channel. /// /// Must be associated with an [`OutChannels`] before anything can be sent on it @@ -78,13 +89,14 @@ pub struct Sender { name: &'static str, /// Threshold queue size to generate an error message in the logs. queue_size_warning: usize, - /// We generate the error message only once to not spam the logs. - warning_fired: bool, + /// We generate the error message only once to not spam the logs after the first error. + /// Subsequently we indicate channel fullness on debug level. + warning_fired: SenderWarningState, /// Backtrace of a place where the channel was created. creation_backtrace: Backtrace, /// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to /// [`OutChannels`] with `OutChannels::push()`. - metrics: Arc>>>>, + metrics: Option, } impl fmt::Debug for Sender { @@ -95,8 +107,7 @@ impl fmt::Debug for Sender { impl Drop for Sender { fn drop(&mut self) { - let metrics = self.metrics.lock(); - if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) { + if let Some(metrics) = self.metrics.as_ref() { metrics.num_channels.with_label_values(&[self.name]).dec(); } } @@ -108,7 +119,7 @@ pub struct Receiver { name: &'static str, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. - metrics: Arc>>>>, + metrics: Option, } impl Stream for Receiver { @@ -116,13 +127,8 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { - let metrics = self.metrics.lock().clone(); - match metrics.as_ref().map(|m| m.as_ref()) { - Some(Some(metrics)) => metrics.event_out(&ev, self.name), - Some(None) => (), // no registry - None => log::warn!( - "Inconsistency in out_events: event happened before sender associated" - ), + if let Some(metrics) = &self.metrics { + metrics.event_out(&ev, self.name); } Poll::Ready(Some(ev)) } else { @@ -151,7 +157,7 @@ pub struct OutChannels { event_streams: Vec, /// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this /// object. - metrics: Arc>, + metrics: Option, } impl OutChannels { @@ -160,17 +166,15 @@ impl OutChannels { let metrics = if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None }; - Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) }) + Ok(Self { event_streams: Vec::new(), metrics }) } /// Adds a new [`Sender`] to the collection. - pub fn push(&mut self, sender: Sender) { - let mut metrics = sender.metrics.lock(); - debug_assert!(metrics.is_none()); - *metrics = Some(self.metrics.clone()); - drop(metrics); + pub fn push(&mut self, mut sender: Sender) { + debug_assert!(sender.metrics.is_none()); + sender.metrics = self.metrics.clone(); - if let Some(metrics) = &*self.metrics { + if let Some(metrics) = &self.metrics { metrics.num_channels.with_label_values(&[sender.name]).inc(); } @@ -180,22 +184,42 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired { - sender.warning_fired = true; - error!( - "The number of unprocessed events in channel `{}` exceeded {}.\n\ - The channel was created at:\n{:}\n - The last event was sent from:\n{:}", - sender.name, - sender.queue_size_warning, - sender.creation_backtrace, - Backtrace::force_capture(), + let current_pending = sender.inner.len(); + if current_pending >= sender.queue_size_warning { + if sender.warning_fired == SenderWarningState::NotFired { + error!( + "The number of unprocessed events in channel `{}` exceeded {}.\n\ + The channel was created at:\n{:}\n + The last event was sent from:\n{:}", + sender.name, + sender.queue_size_warning, + sender.creation_backtrace, + Backtrace::force_capture(), + ); + } else if sender.warning_fired == SenderWarningState::FiredFree { + // We don't want to spam the logs, so we only log on debug level + debug!( + target: LOG_TARGET, + "Channel `{}` is overflowed again. Number of events: {}", + sender.name, current_pending + ); + } + sender.warning_fired = SenderWarningState::FiredFull; + } else if sender.warning_fired == SenderWarningState::FiredFull && + current_pending < sender.queue_size_warning.wrapping_div(2) + { + sender.warning_fired = SenderWarningState::FiredFree; + debug!( + target: LOG_TARGET, + "Channel `{}` is no longer overflowed. Number of events: {}", + sender.name, current_pending ); } + sender.inner.try_send(event.clone()).is_ok() }); - if let Some(metrics) = &*self.metrics { + if let Some(metrics) = &self.metrics { for ev in &self.event_streams { metrics.event_in(&event, ev.name); } @@ -211,6 +235,7 @@ impl fmt::Debug for OutChannels { } } +#[derive(Clone)] struct Metrics { // This list is ordered alphabetically events_total: CounterVec,