Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add more prometheus metrics to network::Protocol. (#5145)
Browse files Browse the repository at this point in the history
  • Loading branch information
gavofyork authored Mar 5, 2020
1 parent 870436e commit 027756e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 3 deletions.
145 changes: 144 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::Message as GenericMessage;
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use prometheus_endpoint::{Registry, Gauge, register, PrometheusError, U64};
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
Expand Down Expand Up @@ -135,6 +136,105 @@ mod rep {
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

struct Metrics {
handshaking_peers: Gauge<U64>,
obsolete_requests: Gauge<U64>,
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
finality_proofs_pending: Gauge<U64>,
finality_proofs_active: Gauge<U64>,
finality_proofs_failed: Gauge<U64>,
finality_proofs_importing: Gauge<U64>,
justifications_pending: Gauge<U64>,
justifications_active: Gauge<U64>,
justifications_failed: Gauge<U64>,
justifications_importing: Gauge<U64>
}

impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
handshaking_peers: {
let g = Gauge::new("sync_handshaking_peers", "number of newly connected peers")?;
register(g, r)?
},
obsolete_requests: {
let g = Gauge::new("sync_obsolete_requests", "total number of obsolete requests")?;
register(g, r)?
},
peers: {
let g = Gauge::new("sync_peers", "number of peers we sync with")?;
register(g, r)?
},
queued_blocks: {
let g = Gauge::new("sync_queued_blocks", "number of blocks in import queue")?;
register(g, r)?
},
fork_targets: {
let g = Gauge::new("sync_fork_targets", "fork sync targets")?;
register(g, r)?
},
justifications_pending: {
let g = Gauge::new(
"sync_extra_justifications_pending",
"number of pending extra justifications requests"
)?;
register(g, r)?
},
justifications_active: {
let g = Gauge::new(
"sync_extra_justifications_active",
"number of active extra justifications requests"
)?;
register(g, r)?
},
justifications_failed: {
let g = Gauge::new(
"sync_extra_justifications_failed",
"number of failed extra justifications requests"
)?;
register(g, r)?
},
justifications_importing: {
let g = Gauge::new(
"sync_extra_justifications_importing",
"number of importing extra justifications requests"
)?;
register(g, r)?
},
finality_proofs_pending: {
let g = Gauge::new(
"sync_extra_finality_proofs_pending",
"number of pending extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_active: {
let g = Gauge::new(
"sync_extra_finality_proofs_active",
"number of active extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_failed: {
let g = Gauge::new(
"sync_extra_finality_proofs_failed",
"number of failed extra finality proof requests"
)?;
register(g, r)?
},
finality_proofs_importing: {
let g = Gauge::new(
"sync_extra_finality_proofs_importing",
"number of importing extra finality proof requests"
)?;
register(g, r)?
},
})
}
}

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
Expand Down Expand Up @@ -163,6 +263,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
/// For each protocol name, the legacy gossiping engine ID.
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
/// Prometheus metrics.
metrics: Option<Metrics>,
}

#[derive(Default)]
Expand Down Expand Up @@ -371,7 +473,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
protocol_id: ProtocolId,
peerset_config: sc_peerset::PeersetConfig,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
metrics_registry: Option<&Registry>
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(
Expand Down Expand Up @@ -416,6 +519,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
behaviour,
protocol_name_by_engine: HashMap::new(),
protocol_engine_by_name: HashMap::new(),
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
None
}
};

Ok((protocol, peerset_handle))
Expand Down Expand Up @@ -859,6 +967,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
});
self.report_metrics()
}

fn maintain_peers(&mut self) {
Expand Down Expand Up @@ -1767,6 +1876,40 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
out
}

fn report_metrics(&self) {
use std::convert::TryInto;

if let Some(metrics) = &self.metrics {
let mut obsolete_requests: u64 = 0;
for peer in self.context_data.peers.values() {
let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
obsolete_requests = obsolete_requests.saturating_add(n);
}
metrics.obsolete_requests.set(obsolete_requests);

let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.handshaking_peers.set(n);

let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);

let m = self.sync.metrics();

metrics.fork_targets.set(m.fork_targets.into());
metrics.queued_blocks.set(m.queued_blocks.into());

metrics.justifications_pending.set(m.justifications.pending_requests.into());
metrics.justifications_active.set(m.justifications.active_requests.into());
metrics.justifications_failed.set(m.justifications.failed_requests.into());
metrics.justifications_importing.set(m.justifications.importing_requests.into());

metrics.finality_proofs_pending.set(m.finality_proofs.pending_requests.into());
metrics.finality_proofs_active.set(m.finality_proofs.active_requests.into());
metrics.finality_proofs_failed.set(m.finality_proofs.failed_requests.into());
metrics.finality_proofs_importing.set(m.finality_proofs.importing_requests.into());
}
}
}

/// Outcome of an incoming custom message.
Expand Down
21 changes: 21 additions & 0 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,27 @@ impl<B: BlockT> ChainSync<B> {
fn is_already_downloading(&self, hash: &B::Hash) -> bool {
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
}

/// Return some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
use std::convert::TryInto;
Metrics {
queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX),
fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX),
finality_proofs: self.extra_finality_proofs.metrics(),
justifications: self.extra_justifications.metrics(),
_priv: ()
}
}
}

#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) queued_blocks: u32,
pub(crate) fork_targets: u32,
pub(crate) finality_proofs: extra_requests::Metrics,
pub(crate) justifications: extra_requests::Metrics,
_priv: ()
}

/// Request the ancestry for a block. Sends a request for header and justification for the given
Expand Down
21 changes: 21 additions & 0 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ pub(crate) struct ExtraRequests<B: BlockT> {
request_type_name: &'static str,
}

#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) pending_requests: u32,
pub(crate) active_requests: u32,
pub(crate) importing_requests: u32,
pub(crate) failed_requests: u32,
_priv: ()
}

impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new(request_type_name: &'static str) -> Self {
ExtraRequests {
Expand Down Expand Up @@ -240,6 +249,18 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
self.pending_requests.iter()
}

/// Get some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
use std::convert::TryInto;
Metrics {
pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX),
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
_priv: ()
}
}
}

/// Matches peers with pending extra requests.
Expand Down
5 changes: 3 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.finality_proof_request_builder,
params.protocol_id.clone(),
peerset_config,
params.block_announce_validator
params.block_announce_validator,
params.metrics_registry.as_ref()
)?;

// Build the swarm.
Expand Down Expand Up @@ -858,7 +859,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
};

this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);

if let Some(metrics) = this.metrics.as_ref() {
metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.peers_count.set(num_connected_peers as u64);
Expand Down

0 comments on commit 027756e

Please sign in to comment.