Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prometheus metrics for RPC calls #7088

Merged
merged 17 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = "~0.1.6"
grbIzl marked this conversation as resolved.
Show resolved Hide resolved
jsonrpc-core = "14.2.0"
pubsub = { package = "jsonrpc-pubsub", version = "14.2.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"}
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "14.2.0" }
ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
ipc = { version = "14.2.0", package = "jsonrpc-ipc-server" }
ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
13 changes: 9 additions & 4 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

#![warn(missing_docs)]

mod middleware;

use std::io;
use jsonrpc_core::IoHandlerExtension;
use jsonrpc_core::{IoHandlerExtension, MetaIoHandler};
use log::error;
use pubsub::PubSubMetadata;

Expand All @@ -32,15 +34,18 @@ const MAX_PAYLOAD: usize = 15 * 1024 * 1024;
const WS_MAX_CONNECTIONS: usize = 100;

/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T>;
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

pub use self::inner::*;
pub use middleware::{RpcMiddleware, RpcMetrics};

/// Construct rpc `IoHandler`
pub fn rpc_handler<M: PubSubMetadata>(
extension: impl IoHandlerExtension<M>
extension: impl IoHandlerExtension<M>,
rpc_middleware: RpcMiddleware,
) -> RpcHandler<M> {
let mut io = pubsub::PubSubHandler::default();
let io_handler = MetaIoHandler::with_middleware(rpc_middleware);
let mut io = pubsub::PubSubHandler::new(io_handler);
extension.augment(&mut io);

// add an endpoint to list all available methods.
Expand Down
87 changes: 87 additions & 0 deletions client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// This file is part of Substrate.

// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Middleware for RPC requests.

use jsonrpc_core::{
Middleware as RequestMiddleware, Metadata,
Request, Response, FutureResponse, FutureOutput
};
use prometheus_endpoint::{
Registry, CounterVec, PrometheusError,
Opts, register, U64
};

use futures::{future::Either, Future};

/// Metrics for RPC middleware
#[derive(Debug, Clone)]
pub struct RpcMetrics {
rpc_calls: CounterVec<U64>,
}

impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
metrics_registry.and_then(|r| {
Some(RpcMetrics {
rpc_calls: register(CounterVec::new(
Opts::new(
"rpc_calls_total",
"Number of rpc calls received",
),
&["protocol"]
).ok()?, r).ok()?,
})
}).ok_or(PrometheusError::Msg("Cannot register metric".to_string()))
}
}

/// Middleware for RPC calls
pub struct RpcMiddleware {
metrics: Option<RpcMetrics>,
transport_label: String,
}

impl RpcMiddleware {
/// Create an instance of middleware with provided metrics
/// transport_label is used as a label for Prometheus collector
pub fn new(metrics: Option<&RpcMetrics>, transport_label: &str) -> Self {
RpcMiddleware {
metrics: metrics.cloned(),
grbIzl marked this conversation as resolved.
Show resolved Hide resolved
transport_label: String::from(transport_label),
}
}
}

impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
type Future = FutureResponse;
type CallFuture = FutureOutput;

fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X>
where
F: Fn(Request, M) -> X + Send + Sync,
X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
{
if let Some(ref metrics) = self.metrics {
metrics.rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
}

Either::B(next(request, meta))
}
}
42 changes: 27 additions & 15 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,14 +545,22 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
);

// RPC
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| gen_handler(
deny_unsafe, &config, task_manager.spawn_handle(), client.clone(), transaction_pool.clone(),
keystore.clone(), on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
let gen_handler = |
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware
| gen_handler(
deny_unsafe, rpc_middleware, &config, task_manager.spawn_handle(),
client.clone(), transaction_pool.clone(), keystore.clone(),
on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc = start_rpc_servers(&config, gen_handler)?;
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry()).ok();
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.as_ref())?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(sc_rpc::DenyUnsafe::No).into()));
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.as_ref(), "inbrowser")
).into()));

// Telemetry
let telemetry = config.telemetry_endpoints.clone().and_then(|endpoints| {
Expand Down Expand Up @@ -671,6 +679,7 @@ fn build_telemetry<TBl: BlockT>(

fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
Expand All @@ -681,7 +690,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>
) -> jsonrpc_pubsub::PubSubHandler<sc_rpc::Metadata>
) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + HeaderBackend<TBl> +
Expand Down Expand Up @@ -746,15 +755,18 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
offchain::OffchainApi::to_delegate(offchain)
});

sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
))
sc_rpc_server::rpc_handler(
(
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
),
rpc_middleware
)
}

/// Parameters to pass into `build_network`.
Expand Down
38 changes: 29 additions & 9 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<T> MallocSizeOfWasm for T {}

/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>>);
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>>);

impl RpcHandlers {
/// Starts an RPC query.
Expand All @@ -118,7 +118,8 @@ impl RpcHandlers {
}

/// Provides access to the underlying `MetaIoHandler`
pub fn io_handler(&self) -> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>> {
pub fn io_handler(&self)
-> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>> {
self.0.clone()
}
}
Expand Down Expand Up @@ -382,9 +383,13 @@ mod waiting {

/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>>(
fn start_rpc_servers<
H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware)
-> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
>(
config: &Configuration,
mut gen_handler: H
mut gen_handler: H,
rpc_metrics: Option<&sc_rpc_server::RpcMetrics>
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error>
where F: FnMut(&SocketAddr) -> Result<T, io::Error>,
Expand Down Expand Up @@ -414,13 +419,21 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
}

Ok(Box::new((
config.rpc_ipc.as_ref().map(|path| sc_rpc_server::start_ipc(&*path, gen_handler(sc_rpc::DenyUnsafe::No))),
config.rpc_ipc.as_ref().map(|path| sc_rpc_server::start_ipc(
&*path, gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "ipc")
)
)),
maybe_start_server(
config.rpc_http,
|address| sc_rpc_server::start_http(
address,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "http")
),
),
)?.map(|s| waiting::HttpServer(Some(s))),
maybe_start_server(
Expand All @@ -429,17 +442,24 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
address,
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "ws")
),
),
)?.map(|s| waiting::WsServer(Some(s))),
)))
}

/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>>(
fn start_rpc_servers<
H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware)
-> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
>(
_: &Configuration,
_: H
_: H,
_: Option<&sc_rpc_server::RpcMetrics>
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
}
Expand Down