Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Prometheus metrics for RPC calls (#7088)
Browse files Browse the repository at this point in the history
* WS and HTTP middlewares added

* Prometheus endpoint added

* Counters renamed

* Proper style for inc

* Metrics initialization re-written

* Rework to handler middleware

* Introduce transport prefix for metrics

* String shortened

* Commented code removed, new line inserted

* One more string shortened

* Wasm build fixed

* Wasm build fixed once again

* Rework to shared metrics

* Added collectors label

* Tilde removed from cargo

* Switch to owned metrics in parameters
  • Loading branch information
grbIzl authored and bkchr committed Sep 18, 2020
1 parent 6b1d600 commit 7cc397f
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = "0.1.6"
jsonrpc-core = "14.2.0"
pubsub = { package = "jsonrpc-pubsub", version = "14.2.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"}
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "14.2.0" }
ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
ipc = { version = "14.2.0", package = "jsonrpc-ipc-server" }
ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
13 changes: 9 additions & 4 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

#![warn(missing_docs)]

mod middleware;

use std::io;
use jsonrpc_core::IoHandlerExtension;
use jsonrpc_core::{IoHandlerExtension, MetaIoHandler};
use log::error;
use pubsub::PubSubMetadata;

Expand All @@ -32,15 +34,18 @@ const MAX_PAYLOAD: usize = 15 * 1024 * 1024;
const WS_MAX_CONNECTIONS: usize = 100;

/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T>;
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

pub use self::inner::*;
pub use middleware::{RpcMiddleware, RpcMetrics};

/// Construct rpc `IoHandler`
pub fn rpc_handler<M: PubSubMetadata>(
extension: impl IoHandlerExtension<M>
extension: impl IoHandlerExtension<M>,
rpc_middleware: RpcMiddleware,
) -> RpcHandler<M> {
let mut io = pubsub::PubSubHandler::default();
let io_handler = MetaIoHandler::with_middleware(rpc_middleware);
let mut io = pubsub::PubSubHandler::new(io_handler);
extension.augment(&mut io);

// add an endpoint to list all available methods.
Expand Down
87 changes: 87 additions & 0 deletions client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// This file is part of Substrate.

// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Middleware for RPC requests.

use jsonrpc_core::{
Middleware as RequestMiddleware, Metadata,
Request, Response, FutureResponse, FutureOutput
};
use prometheus_endpoint::{
Registry, CounterVec, PrometheusError,
Opts, register, U64
};

use futures::{future::Either, Future};

/// Metrics for RPC middleware
#[derive(Debug, Clone)]
pub struct RpcMetrics {
rpc_calls: CounterVec<U64>,
}

impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
metrics_registry.and_then(|r| {
Some(RpcMetrics {
rpc_calls: register(CounterVec::new(
Opts::new(
"rpc_calls_total",
"Number of rpc calls received",
),
&["protocol"]
).ok()?, r).ok()?,
})
}).ok_or(PrometheusError::Msg("Cannot register metric".to_string()))
}
}

/// Middleware for RPC calls
pub struct RpcMiddleware {
metrics: Option<RpcMetrics>,
transport_label: String,
}

impl RpcMiddleware {
/// Create an instance of middleware with provided metrics
/// transport_label is used as a label for Prometheus collector
pub fn new(metrics: Option<RpcMetrics>, transport_label: &str) -> Self {
RpcMiddleware {
metrics,
transport_label: String::from(transport_label),
}
}
}

impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
type Future = FutureResponse;
type CallFuture = FutureOutput;

fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X>
where
F: Fn(Request, M) -> X + Send + Sync,
X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
{
if let Some(ref metrics) = self.metrics {
metrics.rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
}

Either::B(next(request, meta))
}
}
42 changes: 27 additions & 15 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,14 +545,22 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
);

// RPC
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| gen_handler(
deny_unsafe, &config, task_manager.spawn_handle(), client.clone(), transaction_pool.clone(),
keystore.clone(), on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
let gen_handler = |
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware
| gen_handler(
deny_unsafe, rpc_middleware, &config, task_manager.spawn_handle(),
client.clone(), transaction_pool.clone(), keystore.clone(),
on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc = start_rpc_servers(&config, gen_handler)?;
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry()).ok();
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.as_ref())?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(sc_rpc::DenyUnsafe::No).into()));
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.as_ref().cloned(), "inbrowser")
).into()));

// Telemetry
let telemetry = config.telemetry_endpoints.clone().and_then(|endpoints| {
Expand Down Expand Up @@ -660,6 +668,7 @@ fn build_telemetry<TBl: BlockT>(

fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
Expand All @@ -670,7 +679,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>
) -> jsonrpc_pubsub::PubSubHandler<sc_rpc::Metadata>
) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + HeaderBackend<TBl> +
Expand Down Expand Up @@ -735,15 +744,18 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
offchain::OffchainApi::to_delegate(offchain)
});

sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
))
sc_rpc_server::rpc_handler(
(
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
),
rpc_middleware
)
}

/// Parameters to pass into `build_network`.
Expand Down
38 changes: 29 additions & 9 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<T> MallocSizeOfWasm for T {}

/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>>);
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>>);

impl RpcHandlers {
/// Starts an RPC query.
Expand All @@ -118,7 +118,8 @@ impl RpcHandlers {
}

/// Provides access to the underlying `MetaIoHandler`
pub fn io_handler(&self) -> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>> {
pub fn io_handler(&self)
-> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>> {
self.0.clone()
}
}
Expand Down Expand Up @@ -382,9 +383,13 @@ mod waiting {

/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>>(
fn start_rpc_servers<
H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware)
-> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
>(
config: &Configuration,
mut gen_handler: H
mut gen_handler: H,
rpc_metrics: Option<&sc_rpc_server::RpcMetrics>
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error>
where F: FnMut(&SocketAddr) -> Result<T, io::Error>,
Expand Down Expand Up @@ -414,13 +419,21 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
}

Ok(Box::new((
config.rpc_ipc.as_ref().map(|path| sc_rpc_server::start_ipc(&*path, gen_handler(sc_rpc::DenyUnsafe::No))),
config.rpc_ipc.as_ref().map(|path| sc_rpc_server::start_ipc(
&*path, gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "ipc")
)
)),
maybe_start_server(
config.rpc_http,
|address| sc_rpc_server::start_http(
address,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "http")
),
),
)?.map(|s| waiting::HttpServer(Some(s))),
maybe_start_server(
Expand All @@ -429,17 +442,24 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
address,
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "ws")
),
),
)?.map(|s| waiting::WsServer(Some(s))),
)))
}

/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>>(
fn start_rpc_servers<
H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware)
-> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
>(
_: &Configuration,
_: H
_: H,
_: Option<&sc_rpc_server::RpcMetrics>
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
}
Expand Down

0 comments on commit 7cc397f

Please sign in to comment.