Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add more prometheus metrics to network::Protocol. #5145

Merged
merged 1 commit into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 144 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::Message as GenericMessage;
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use prometheus_endpoint::{Registry, Gauge, register, PrometheusError, U64};
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
Expand Down Expand Up @@ -135,6 +136,105 @@ mod rep {
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

struct Metrics {
handshaking_peers: Gauge<U64>,
obsolete_requests: Gauge<U64>,
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
finality_proofs_pending: Gauge<U64>,
finality_proofs_active: Gauge<U64>,
finality_proofs_failed: Gauge<U64>,
finality_proofs_importing: Gauge<U64>,
justifications_pending: Gauge<U64>,
justifications_active: Gauge<U64>,
justifications_failed: Gauge<U64>,
justifications_importing: Gauge<U64>
}

impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
handshaking_peers: {
let g = Gauge::new("sync_handshaking_peers", "number of newly connected peers")?;
register(g, r)?
},
obsolete_requests: {
let g = Gauge::new("sync_obsolete_requests", "total number of obsolete requests")?;
register(g, r)?
},
peers: {
let g = Gauge::new("sync_peers", "number of peers we sync with")?;
register(g, r)?
},
queued_blocks: {
let g = Gauge::new("sync_queued_blocks", "number of blocks in import queue")?;
register(g, r)?
},
fork_targets: {
let g = Gauge::new("sync_fork_targets", "fork sync targets")?;
register(g, r)?
},
justifications_pending: {
let g = Gauge::new(
"sync_extra_justifications_pending",
"number of pending extra justifications requests"
)?;
register(g, r)?
},
justifications_active: {
let g = Gauge::new(
"sync_extra_justifications_active",
"number of active extra justifications requests"
)?;
register(g, r)?
},
justifications_failed: {
let g = Gauge::new(
"sync_extra_justifications_failed",
"number of failed extra justifications requests"
)?;
register(g, r)?
},
justifications_importing: {
let g = Gauge::new(
"sync_extra_justifications_importing",
"number of importing extra justifications requests"
)?;
register(g, r)?
},
finality_proofs_pending: {
let g = Gauge::new(
"sync_extra_finality_proofs_pending",
"number of pending extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_active: {
let g = Gauge::new(
"sync_extra_finality_proofs_active",
"number of active extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_failed: {
let g = Gauge::new(
"sync_extra_finality_proofs_failed",
"number of failed extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_importing: {
let g = Gauge::new(
"sync_extra_finality_proofs_importing",
"number of importing extra finality proof requests"
)?;
register(g, r)?
},
})
}
}

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
Expand Down Expand Up @@ -163,6 +263,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
/// For each protocol name, the legacy gossiping engine ID.
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
/// Prometheus metrics.
metrics: Option<Metrics>,
}

#[derive(Default)]
Expand Down Expand Up @@ -371,7 +473,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
protocol_id: ProtocolId,
peerset_config: sc_peerset::PeersetConfig,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
metrics_registry: Option<&Registry>
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(
Expand Down Expand Up @@ -416,6 +519,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
behaviour,
protocol_name_by_engine: HashMap::new(),
protocol_engine_by_name: HashMap::new(),
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
None
}
};

Ok((protocol, peerset_handle))
Expand Down Expand Up @@ -859,6 +967,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
});
self.report_metrics()
}

fn maintain_peers(&mut self) {
Expand Down Expand Up @@ -1764,6 +1873,40 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
out
}

fn report_metrics(&self) {
use std::convert::TryInto;

if let Some(metrics) = &self.metrics {
let mut obsolete_requests: u64 = 0;
for peer in self.context_data.peers.values() {
let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
obsolete_requests = obsolete_requests.saturating_add(n);
}
metrics.obsolete_requests.set(obsolete_requests);

let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.handshaking_peers.set(n);

let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);

let m = self.sync.metrics();

metrics.fork_targets.set(m.fork_targets.into());
metrics.queued_blocks.set(m.queued_blocks.into());

metrics.justifications_pending.set(m.justifications.pending_requests.into());
metrics.justifications_active.set(m.justifications.active_requests.into());
metrics.justifications_failed.set(m.justifications.failed_requests.into());
metrics.justifications_importing.set(m.justifications.importing_requests.into());

metrics.finality_proofs_pending.set(m.finality_proofs.pending_requests.into());
metrics.finality_proofs_active.set(m.finality_proofs.active_requests.into());
metrics.finality_proofs_failed.set(m.finality_proofs.failed_requests.into());
metrics.finality_proofs_importing.set(m.finality_proofs.importing_requests.into());
}
}
}

/// Outcome of an incoming custom message.
Expand Down
21 changes: 21 additions & 0 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,27 @@ impl<B: BlockT> ChainSync<B> {
fn is_already_downloading(&self, hash: &B::Hash) -> bool {
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
}

/// Return some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
use std::convert::TryInto;
Metrics {
queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX),
fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX),
finality_proofs: self.extra_finality_proofs.metrics(),
justifications: self.extra_justifications.metrics(),
_priv: ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twittner am I understanding correctly that _priv is used to tell its consumer that the given structure might be extended in the future and that such an extension would not be a breaking change (see https://github.com/rust-lang/rfcs/blob/master/text/2008-non-exhaustive.md)?

Is this really necessary given that this is not exposed across crate boundaries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added to ensure that any Metrics value anywhere outside this module was created here. #[non_exchaustive] only works for downstream crates. It is probably not really necessary so if you feel strongly about it free to remove it.

}
}
}

#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) queued_blocks: u32,
pub(crate) fork_targets: u32,
pub(crate) finality_proofs: extra_requests::Metrics,
pub(crate) justifications: extra_requests::Metrics,
_priv: ()
}

/// Request the ancestry for a block. Sends a request for header and justification for the given
Expand Down
21 changes: 21 additions & 0 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ pub(crate) struct ExtraRequests<B: BlockT> {
request_type_name: &'static str,
}

#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) pending_requests: u32,
pub(crate) active_requests: u32,
pub(crate) importing_requests: u32,
pub(crate) failed_requests: u32,
_priv: ()
}

impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new(request_type_name: &'static str) -> Self {
ExtraRequests {
Expand Down Expand Up @@ -240,6 +249,18 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
self.pending_requests.iter()
}

/// Get some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
use std::convert::TryInto;
Metrics {
pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX),
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
_priv: ()
}
}
}

/// Matches peers with pending extra requests.
Expand Down
5 changes: 3 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.finality_proof_request_builder,
params.protocol_id.clone(),
peerset_config,
params.block_announce_validator
params.block_announce_validator,
params.metrics_registry.as_ref()
)?;

// Build the swarm.
Expand Down Expand Up @@ -858,7 +859,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
};

this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);

if let Some(metrics) = this.metrics.as_ref() {
metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.peers_count.set(num_connected_peers as u64);
Expand Down