diff --git a/Cargo.lock b/Cargo.lock index df454edaf22f1..f635807f6fbc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7111,6 +7111,7 @@ dependencies = [ name = "sc-rpc-server" version = "2.0.0-rc6" dependencies = [ + "futures 0.1.29", "jsonrpc-core", "jsonrpc-http-server", "jsonrpc-ipc-server", @@ -7120,6 +7121,7 @@ dependencies = [ "serde", "serde_json", "sp-runtime", + "substrate-prometheus-endpoint", ] [[package]] diff --git a/client/rpc-servers/Cargo.toml b/client/rpc-servers/Cargo.toml index 3af5cdd039d8b..0d3589a00a65e 100644 --- a/client/rpc-servers/Cargo.toml +++ b/client/rpc-servers/Cargo.toml @@ -12,14 +12,16 @@ description = "Substrate RPC servers." targets = ["x86_64-unknown-linux-gnu"] [dependencies] +futures = "0.1.6" jsonrpc-core = "14.2.0" pubsub = { package = "jsonrpc-pubsub", version = "14.2.0" } log = "0.4.8" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"} serde = "1.0.101" serde_json = "1.0.41" sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" } [target.'cfg(not(target_os = "unknown"))'.dependencies] http = { package = "jsonrpc-http-server", version = "14.2.0" } -ws = { package = "jsonrpc-ws-server", version = "14.2.0" } ipc = { version = "14.2.0", package = "jsonrpc-ipc-server" } +ws = { package = "jsonrpc-ws-server", version = "14.2.0" } diff --git a/client/rpc-servers/src/lib.rs b/client/rpc-servers/src/lib.rs index 1e476262acea8..1f99e8bb0d242 100644 --- a/client/rpc-servers/src/lib.rs +++ b/client/rpc-servers/src/lib.rs @@ -20,8 +20,10 @@ #![warn(missing_docs)] +mod middleware; + use std::io; -use jsonrpc_core::IoHandlerExtension; +use jsonrpc_core::{IoHandlerExtension, MetaIoHandler}; use log::error; use pubsub::PubSubMetadata; @@ -32,15 +34,18 @@ const MAX_PAYLOAD: usize = 15 * 1024 * 1024; const WS_MAX_CONNECTIONS: usize = 100; /// The RPC IoHandler containing all requested APIs. -pub type RpcHandler = pubsub::PubSubHandler; +pub type RpcHandler = pubsub::PubSubHandler; pub use self::inner::*; +pub use middleware::{RpcMiddleware, RpcMetrics}; /// Construct rpc `IoHandler` pub fn rpc_handler( - extension: impl IoHandlerExtension + extension: impl IoHandlerExtension, + rpc_middleware: RpcMiddleware, ) -> RpcHandler { - let mut io = pubsub::PubSubHandler::default(); + let io_handler = MetaIoHandler::with_middleware(rpc_middleware); + let mut io = pubsub::PubSubHandler::new(io_handler); extension.augment(&mut io); // add an endpoint to list all available methods. diff --git a/client/rpc-servers/src/middleware.rs b/client/rpc-servers/src/middleware.rs new file mode 100644 index 0000000000000..74139714c8cb7 --- /dev/null +++ b/client/rpc-servers/src/middleware.rs @@ -0,0 +1,87 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Middleware for RPC requests. + +use jsonrpc_core::{ + Middleware as RequestMiddleware, Metadata, + Request, Response, FutureResponse, FutureOutput +}; +use prometheus_endpoint::{ + Registry, CounterVec, PrometheusError, + Opts, register, U64 +}; + +use futures::{future::Either, Future}; + +/// Metrics for RPC middleware +#[derive(Debug, Clone)] +pub struct RpcMetrics { + rpc_calls: CounterVec, +} + +impl RpcMetrics { + /// Create an instance of metrics + pub fn new(metrics_registry: Option<&Registry>) -> Result { + metrics_registry.and_then(|r| { + Some(RpcMetrics { + rpc_calls: register(CounterVec::new( + Opts::new( + "rpc_calls_total", + "Number of rpc calls received", + ), + &["protocol"] + ).ok()?, r).ok()?, + }) + }).ok_or(PrometheusError::Msg("Cannot register metric".to_string())) + } +} + +/// Middleware for RPC calls +pub struct RpcMiddleware { + metrics: Option, + transport_label: String, +} + +impl RpcMiddleware { + /// Create an instance of middleware with provided metrics + /// transport_label is used as a label for Prometheus collector + pub fn new(metrics: Option, transport_label: &str) -> Self { + RpcMiddleware { + metrics, + transport_label: String::from(transport_label), + } + } +} + +impl RequestMiddleware for RpcMiddleware { + type Future = FutureResponse; + type CallFuture = FutureOutput; + + fn on_request(&self, request: Request, meta: M, next: F) -> Either + where + F: Fn(Request, M) -> X + Send + Sync, + X: Future, Error = ()> + Send + 'static, + { + if let Some(ref metrics) = self.metrics { + metrics.rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc(); + } + + Either::B(next(request, meta)) + } +} diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 25abfdffed845..410198af26da3 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -545,14 +545,22 @@ pub fn spawn_tasks( ); // RPC - let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| gen_handler( - deny_unsafe, &config, task_manager.spawn_handle(), client.clone(), transaction_pool.clone(), - keystore.clone(), on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder, + let gen_handler = | + deny_unsafe: sc_rpc::DenyUnsafe, + rpc_middleware: sc_rpc_server::RpcMiddleware + | gen_handler( + deny_unsafe, rpc_middleware, &config, task_manager.spawn_handle(), + client.clone(), transaction_pool.clone(), keystore.clone(), + on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder, backend.offchain_storage(), system_rpc_tx.clone() ); - let rpc = start_rpc_servers(&config, gen_handler)?; + let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry()).ok(); + let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.as_ref())?; // This is used internally, so don't restrict access to unsafe RPC - let rpc_handlers = RpcHandlers(Arc::new(gen_handler(sc_rpc::DenyUnsafe::No).into())); + let rpc_handlers = RpcHandlers(Arc::new(gen_handler( + sc_rpc::DenyUnsafe::No, + sc_rpc_server::RpcMiddleware::new(rpc_metrics.as_ref().cloned(), "inbrowser") + ).into())); // Telemetry let telemetry = config.telemetry_endpoints.clone().and_then(|endpoints| { @@ -660,6 +668,7 @@ fn build_telemetry( fn gen_handler( deny_unsafe: sc_rpc::DenyUnsafe, + rpc_middleware: sc_rpc_server::RpcMiddleware, config: &Configuration, spawn_handle: SpawnTaskHandle, client: Arc, @@ -670,7 +679,7 @@ fn gen_handler( rpc_extensions_builder: &(dyn RpcExtensionBuilder + Send), offchain_storage: Option<>::OffchainStorage>, system_rpc_tx: TracingUnboundedSender> -) -> jsonrpc_pubsub::PubSubHandler +) -> sc_rpc_server::RpcHandler where TBl: BlockT, TCl: ProvideRuntimeApi + BlockchainEvents + HeaderBackend + @@ -735,15 +744,18 @@ fn gen_handler( offchain::OffchainApi::to_delegate(offchain) }); - sc_rpc_server::rpc_handler(( - state::StateApi::to_delegate(state), - state::ChildStateApi::to_delegate(child_state), - chain::ChainApi::to_delegate(chain), - maybe_offchain_rpc, - author::AuthorApi::to_delegate(author), - system::SystemApi::to_delegate(system), - rpc_extensions_builder.build(deny_unsafe, task_executor), - )) + sc_rpc_server::rpc_handler( + ( + state::StateApi::to_delegate(state), + state::ChildStateApi::to_delegate(child_state), + chain::ChainApi::to_delegate(chain), + maybe_offchain_rpc, + author::AuthorApi::to_delegate(author), + system::SystemApi::to_delegate(system), + rpc_extensions_builder.build(deny_unsafe, task_executor), + ), + rpc_middleware + ) } /// Parameters to pass into `build_network`. diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index d5d503d22d171..39f1dff289a1a 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -97,7 +97,7 @@ impl MallocSizeOfWasm for T {} /// RPC handlers that can perform RPC queries. #[derive(Clone)] -pub struct RpcHandlers(Arc>); +pub struct RpcHandlers(Arc>); impl RpcHandlers { /// Starts an RPC query. @@ -118,7 +118,8 @@ impl RpcHandlers { } /// Provides access to the underlying `MetaIoHandler` - pub fn io_handler(&self) -> Arc> { + pub fn io_handler(&self) + -> Arc> { self.0.clone() } } @@ -382,9 +383,13 @@ mod waiting { /// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive. #[cfg(not(target_os = "unknown"))] -fn start_rpc_servers sc_rpc_server::RpcHandler>( +fn start_rpc_servers< + H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware) + -> sc_rpc_server::RpcHandler +>( config: &Configuration, - mut gen_handler: H + mut gen_handler: H, + rpc_metrics: Option<&sc_rpc_server::RpcMetrics> ) -> Result, error::Error> { fn maybe_start_server(address: Option, mut start: F) -> Result, io::Error> where F: FnMut(&SocketAddr) -> Result, @@ -414,13 +419,21 @@ fn start_rpc_servers sc_rpc_server::RpcHandler sc_rpc_server::RpcHandler sc_rpc_server::RpcHandler sc_rpc_server::RpcHandler>( +fn start_rpc_servers< + H: FnMut(sc_rpc::DenyUnsafe, sc_rpc_server::RpcMiddleware) + -> sc_rpc_server::RpcHandler +>( _: &Configuration, - _: H + _: H, + _: Option<&sc_rpc_server::RpcMetrics> ) -> Result, error::Error> { Ok(Box::new(())) }