diff --git a/Cargo.lock b/Cargo.lock index 239e467923ac..09114e9cc835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6840,9 +6840,9 @@ checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" [[package]] name = "jsonrpsee" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0" +checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", @@ -6856,9 +6856,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83" +checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131" dependencies = [ "futures-util", "http", @@ -6877,9 +6877,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868" +checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1" dependencies = [ "anyhow", "async-lock 3.3.0", @@ -6903,9 +6903,9 @@ dependencies = [ [[package]] name = "jsonrpsee-http-client" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a06ef0de060005fddf772d54597bb6a8b0413da47dcffd304b0306147b9678" +checksum = "9e7a95e346f55df84fb167b7e06470e196e7d5b9488a21d69c5d9732043ba7ba" dependencies = [ "async-trait", "hyper", @@ -6923,22 +6923,22 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69fc56131589f82e57805f7338b87023db4aafef813555708b159787e34ad6bc" +checksum = "30ca066e73dd70294aebc5c2675d8ffae43be944af027c857ce0d4c51785f014" dependencies = [ "heck 0.4.1", "proc-macro-crate 3.0.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.53", ] [[package]] name = "jsonrpsee-server" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85be77fe5b2a94589e3164fb780017f7aff7d646b49278c0d0346af16975c8e" +checksum = "0e29c1bd1f9bba83c864977c73404e505f74f730fa0db89dd490ec174e36d7f0" dependencies = [ "futures-util", "http", @@ -6960,9 +6960,9 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a" +checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368" dependencies = [ "anyhow", "beef", @@ -6973,9 +6973,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181" +checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9" dependencies = [ "http", "jsonrpsee-client-transport", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index c62b3e789d38..937e5c6b626a 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -44,6 +44,7 @@ futures-util = { version = "0.3.30", default-features = false } rand = "0.8.5" [dev-dependencies] +jsonrpsee = { version = "0.22", features = ["server", "ws-client"] } serde_json = { workspace = true, default-features = true } tokio = { version = "1.22.0", features = ["macros"] } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index 00000e1fb277..3851adac2644 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -27,7 +27,7 @@ use crate::{ common::events::StorageQuery, }; use jsonrpsee::{proc_macros::rpc, server::ResponsePayload}; -use sp_rpc::list::ListOrValue; +pub use sp_rpc::list::ListOrValue; #[rpc(client, server)] pub trait ChainHeadApi { @@ -54,8 +54,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_body", blocking)] - fn chain_head_unstable_body( + #[method(name = "chainHead_unstable_body", raw_method)] + async fn chain_head_unstable_body( &self, follow_subscription: String, hash: Hash, @@ -73,8 +73,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_header", blocking)] - fn chain_head_unstable_header( + #[method(name = "chainHead_unstable_header", raw_method)] + async fn chain_head_unstable_header( &self, follow_subscription: String, hash: Hash, @@ -85,8 +85,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_storage", blocking)] - fn chain_head_unstable_storage( + #[method(name = "chainHead_unstable_storage", raw_method)] + async fn chain_head_unstable_storage( &self, follow_subscription: String, hash: Hash, @@ -99,8 +99,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_call", blocking)] - fn chain_head_unstable_call( + #[method(name = "chainHead_unstable_call", raw_method)] + async fn chain_head_unstable_call( &self, follow_subscription: String, hash: Hash, @@ -118,8 +118,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_unpin", blocking)] - fn chain_head_unstable_unpin( + #[method(name = "chainHead_unstable_unpin", raw_method)] + async fn chain_head_unstable_unpin( &self, follow_subscription: String, hash_or_hashes: ListOrValue, @@ -131,8 +131,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_continue", blocking)] - fn chain_head_unstable_continue( + #[method(name = "chainHead_unstable_continue", raw_method)] + async fn chain_head_unstable_continue( &self, follow_subscription: String, operation_id: String, @@ -145,8 +145,8 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_stopOperation", blocking)] - fn chain_head_unstable_stop_operation( + #[method(name = "chainHead_unstable_stopOperation", raw_method)] + async fn chain_head_unstable_stop_operation( &self, follow_subscription: String, operation_id: String, diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 2bda22b45239..975abbca4b68 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -34,10 +34,10 @@ use crate::{ hex_string, SubscriptionTaskExecutor, }; use codec::Encode; -use futures::future::FutureExt; +use futures::{channel::oneshot, future::FutureExt}; use jsonrpsee::{ - core::async_trait, server::ResponsePayload, types::SubscriptionId, MethodResponseFuture, - PendingSubscriptionSink, SubscriptionSink, + core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionDetails, + MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink, }; use log::debug; use sc_client_api::{ @@ -65,6 +65,8 @@ pub struct ChainHeadConfig { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, + /// The maximum number of `chainHead_follow` subscriptions per connection. + pub max_follow_subscriptions_per_connection: usize, } /// Maximum pinned blocks across all connections. @@ -86,6 +88,9 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; +/// The maximum number of `chainHead_follow` subscriptions per connection. +const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; + impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { @@ -93,6 +98,7 @@ impl Default for ChainHeadConfig { subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, } } } @@ -106,7 +112,7 @@ pub struct ChainHead, Block: BlockT, Client> { /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, /// Keep track of the pinned blocks for each subscription. - subscriptions: Arc>, + subscriptions: SubscriptionManagement, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, @@ -126,12 +132,13 @@ impl, Block: BlockT, Client> ChainHead { client, backend: backend.clone(), executor, - subscriptions: Arc::new(SubscriptionManagement::new( + subscriptions: SubscriptionManagement::new( config.global_max_pinned_blocks, config.subscription_max_pinned_duration, config.subscription_max_ongoing_operations, + config.max_follow_subscriptions_per_connection, backend, - )), + ), operation_max_storage_items: config.operation_max_storage_items, _phantom: PhantomData, } @@ -182,12 +189,23 @@ where let client = self.client.clone(); let fut = async move { + // Ensure the current connection ID has enough space to accept a new subscription. + let connection_id = pending.connection_id(); + // The RAII `reserved_subscription` will clean up resources on drop: + // - free the reserved subscription for the connection ID. + // - remove the subscription ID from the subscription management. + let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id) + else { + pending.reject(ChainHeadRpcError::ReachedLimits).await; + return + }; + let Ok(sink) = pending.accept().await else { return }; let sub_id = read_subscription_id_as_string(&sink); - // Keep track of the subscription. - let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime) + let Some(sub_data) = + reserved_subscription.insert_subscription(sub_id.clone(), with_runtime) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. @@ -201,91 +219,117 @@ where let mut chain_head_follow = ChainHeadFollower::new( client, backend, - subscriptions.clone(), + subscriptions, with_runtime, sub_id.clone(), ); chain_head_follow.generate_events(sink, sub_data).await; - subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); }; self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); } - fn chain_head_unstable_body( + async fn chain_head_unstable_body( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, ) -> ResponsePayload<'static, MethodResponse> { - let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) | - Err(SubscriptionManagementError::ExceededLimits) => - return ResponsePayload::success(MethodResponse::LimitReached), - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - return ResponsePayload::error(ChainHeadRpcError::InvalidBlock); - }, - Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock), - }; + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } - let operation_id = block_guard.operation().operation_id(); + let client = self.client.clone(); + let subscriptions = self.subscriptions.clone(); + let executor = self.executor.clone(); + + let result = spawn_blocking(&self.executor, async move { + let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => + return ResponsePayload::success(MethodResponse::LimitReached), + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + return ResponsePayload::error(ChainHeadRpcError::InvalidBlock); + }, + Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock), + }; - let event = match self.client.block(hash) { - Ok(Some(signed_block)) => { - let extrinsics = signed_block - .block - .extrinsics() - .iter() - .map(|extrinsic| hex_string(&extrinsic.encode())) - .collect(); - FollowEvent::::OperationBodyDone(OperationBodyDone { + let operation_id = block_guard.operation().operation_id(); + + let event = match client.block(hash) { + Ok(Some(signed_block)) => { + let extrinsics = signed_block + .block + .extrinsics() + .iter() + .map(|extrinsic| hex_string(&extrinsic.encode())) + .collect(); + FollowEvent::::OperationBodyDone(OperationBodyDone { + operation_id: operation_id.clone(), + value: extrinsics, + }) + }, + Ok(None) => { + // The block's body was pruned. This subscription ID has become invalid. + debug!( + target: LOG_TARGET, + "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + &follow_subscription, + hash + ); + subscriptions.remove_subscription(&follow_subscription); + return ResponsePayload::error(ChainHeadRpcError::InvalidBlock) + }, + Err(error) => FollowEvent::::OperationError(OperationError { operation_id: operation_id.clone(), - value: extrinsics, - }) - }, - Ok(None) => { - // The block's body was pruned. This subscription ID has become invalid. - debug!( - target: LOG_TARGET, - "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - &follow_subscription, - hash - ); - self.subscriptions.remove_subscription(&follow_subscription); - return ResponsePayload::error(ChainHeadRpcError::InvalidBlock) - }, - Err(error) => FollowEvent::::OperationError(OperationError { - operation_id: operation_id.clone(), - error: error.to_string(), - }), - }; + error: error.to_string(), + }), + }; - let (rp, rp_fut) = method_started_response(operation_id, None); + let (rp, rp_fut) = method_started_response(operation_id, None); + let fut = async move { + // Wait for the server to send out the response and if it produces an error no event + // should be generated. + if rp_fut.await.is_err() { + return; + } - let fut = async move { - // Events should only by generated - // if the response was successfully propagated. - if rp_fut.await.is_err() { - return; - } - let _ = block_guard.response_sender().unbounded_send(event); - }; + let _ = block_guard.response_sender().unbounded_send(event); + }; + executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + rp + }); - rp + result + .await + .unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached)) } - fn chain_head_unstable_header( + async fn chain_head_unstable_header( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, ) -> Result, ChainHeadRpcError> { - let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(None); + } + + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => return Ok(None), @@ -296,19 +340,35 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - self.client - .header(hash) - .map(|opt_header| opt_header.map(|h| hex_string(&h.encode()))) - .map_err(|err| ChainHeadRpcError::InternalError(err.to_string())) + let client = self.client.clone(); + let result = spawn_blocking(&self.executor, async move { + let _block_guard = block_guard; + + client + .header(hash) + .map(|opt_header| opt_header.map(|h| hex_string(&h.encode()))) + .map_err(|err| ChainHeadRpcError::InternalError(err.to_string())) + }); + result.await.unwrap_or_else(|_| Ok(None)) } - fn chain_head_unstable_storage( + async fn chain_head_unstable_storage( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, items: Vec>, child_trie: Option, ) -> ResponsePayload<'static, MethodResponse> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } + // Gain control over parameter parsing and returned error. let items = match items .into_iter() @@ -357,25 +417,25 @@ where let mut items = items; items.truncate(num_operations); - let (rp, rp_is_success) = method_started_response(operation_id, Some(discarded)); - + let (rp, rp_fut) = method_started_response(operation_id, Some(discarded)); let fut = async move { - // Events should only by generated - // if the response was successfully propagated. - if rp_is_success.await.is_err() { + // Wait for the server to send out the response and if it produces an error no event + // should be generated. + if rp_fut.await.is_err() { return; } + storage_client.generate_events(block_guard, hash, items, child_trie).await; }; - self.executor .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); rp } - fn chain_head_unstable_call( + async fn chain_head_unstable_call( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, function: String, @@ -386,6 +446,15 @@ where Err(err) => return ResponsePayload::error(err), }; + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | @@ -408,44 +477,53 @@ where } let operation_id = block_guard.operation().operation_id(); - let event = self - .client - .executor() - .call(hash, &function, &call_parameters, CallContext::Offchain) - .map(|result| { - FollowEvent::::OperationCallDone(OperationCallDone { - operation_id: operation_id.clone(), - output: hex_string(&result), - }) - }) - .unwrap_or_else(|error| { - FollowEvent::::OperationError(OperationError { - operation_id: operation_id.clone(), - error: error.to_string(), - }) - }); - - let (rp, rp_fut) = method_started_response(operation_id, None); + let client = self.client.clone(); + let (rp, rp_fut) = method_started_response(operation_id.clone(), None); let fut = async move { - // Events should only by generated - // if the response was successfully propagated. + // Wait for the server to send out the response and if it produces an error no event + // should be generated. if rp_fut.await.is_err() { - return; + return } + + let event = client + .executor() + .call(hash, &function, &call_parameters, CallContext::Offchain) + .map(|result| { + FollowEvent::::OperationCallDone(OperationCallDone { + operation_id: operation_id.clone(), + output: hex_string(&result), + }) + }) + .unwrap_or_else(|error| { + FollowEvent::::OperationError(OperationError { + operation_id: operation_id.clone(), + error: error.to_string(), + }) + }); + let _ = block_guard.response_sender().unbounded_send(event); }; - - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + self.executor + .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); rp } - fn chain_head_unstable_unpin( + async fn chain_head_unstable_unpin( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash_or_hashes: ListOrValue, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()); + } + let result = match hash_or_hashes { ListOrValue::Value(hash) => self.subscriptions.unpin_blocks(&follow_subscription, [hash]), @@ -469,11 +547,19 @@ where } } - fn chain_head_unstable_continue( + async fn chain_head_unstable_continue( &self, + connection_details: ConnectionDetails, follow_subscription: String, operation_id: String, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()) + } + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { return Ok(()) @@ -487,11 +573,19 @@ where } } - fn chain_head_unstable_stop_operation( + async fn chain_head_unstable_stop_operation( &self, + connection_details: ConnectionDetails, follow_subscription: String, operation_id: String, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()) + } + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { return Ok(()) @@ -510,3 +604,26 @@ fn method_started_response( let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items }); ResponsePayload::success(rp).notify_on_completion() } + +/// Spawn a blocking future on the provided executor and return the result on a oneshot channel. +/// +/// This is a wrapper to extract the result of a `executor.spawn_blocking` future. +fn spawn_blocking( + executor: &SubscriptionTaskExecutor, + fut: impl std::future::Future + Send + 'static, +) -> oneshot::Receiver +where + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + + let blocking_fut = async move { + let result = fut.await; + // Send the result back on the channel. + let _ = tx.send(result); + }; + + executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed()); + + rx +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index afa99f3aa164..90cc62a36fa9 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -60,7 +60,7 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { /// Backend of the chain. backend: Arc, /// Subscriptions handle. - sub_handle: Arc>, + sub_handle: SubscriptionManagement, /// Subscription was started with the runtime updates flag. with_runtime: bool, /// Subscription ID. @@ -74,7 +74,7 @@ impl, Block: BlockT, Client> ChainHeadFollower, backend: Arc, - sub_handle: Arc>, + sub_handle: SubscriptionManagement, with_runtime: bool, sub_id: String, ) -> Self { @@ -546,7 +546,12 @@ where EventStream: Stream> + Unpin, { let mut stream_item = stream.next(); - let mut stop_event = rx_stop; + + // The stop event can be triggered by the chainHead logic when the pinned + // block guarantee cannot be hold. Or when the client is disconnected. + let connection_closed = sink.closed(); + tokio::pin!(connection_closed); + let mut stop_event = futures_util::future::select(rx_stop, connection_closed); while let Either::Left((Some(event), next_stop_event)) = futures_util::future::select(stream_item, stop_event).await @@ -594,8 +599,10 @@ where stop_event = next_stop_event; } - // If we got here either the substrate streams have closed - // or the `Stop` receiver was triggered. + // If we got here either: + // - the substrate streams have closed + // - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee) + // - the client disconnected. let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/error.rs index 8c50e445aa0c..35604db06600 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/error.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/error.rs @@ -23,6 +23,9 @@ use jsonrpsee::types::error::ErrorObject; /// ChainHead RPC errors. #[derive(Debug, thiserror::Error)] pub enum Error { + /// Maximum number of chainHead_follow has been reached. + #[error("Maximum number of chainHead_follow has been reached")] + ReachedLimits, /// The provided block hash is invalid. #[error("Invalid block hash")] InvalidBlock, @@ -46,6 +49,8 @@ pub enum Error { /// Errors for `chainHead` RPC module, as defined in /// . pub mod rpc_spec_v2 { + /// Maximum number of chainHead_follow has been reached. + pub const REACHED_LIMITS: i32 = -32800; /// The provided block hash is invalid. pub const INVALID_BLOCK_ERROR: i32 = -32801; /// The follow subscription was started with `withRuntime` set to `false`. @@ -70,6 +75,8 @@ impl From for ErrorObject<'static> { let msg = e.to_string(); match e { + Error::ReachedLimits => + ErrorObject::owned(rpc_spec_v2::REACHED_LIMITS, msg, None::<()>), Error::InvalidBlock => ErrorObject::owned(rpc_spec_v2::INVALID_BLOCK_ERROR, msg, None::<()>), Error::InvalidRuntimeCall(_) => diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d2879679501f..1ebee3c80fc8 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1455,4 +1455,57 @@ mod tests { let permit_three = ops.reserve_at_most(1).unwrap(); assert_eq!(permit_three.num_ops, 1); } + + #[test] + fn reserved_subscription_cleans_resources() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + backend, + ))); + + // Maximum 2 subscriptions per connection. + let rpc_connections = crate::common::connections::RpcConnections::new(2); + + let subscription_management = + crate::chain_head::subscription::SubscriptionManagement::_from_inner( + subs.clone(), + rpc_connections.clone(), + ); + + let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + let mut reserved_sub_second = subscription_management.reserve_subscription(1).unwrap(); + // Subscriptions reserved but not yet populated. + assert_eq!(subs.read().subs.len(), 0); + + // Cannot reserve anymore. + assert!(subscription_management.reserve_subscription(1).is_none()); + // Drop the first subscription. + drop(reserved_sub_first); + // Space is freed-up for the rpc connections. + let mut reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + + // Insert subscriptions. + let _sub_data_first = + reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap(); + let _sub_data_second = + reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap(); + // Check we have 2 subscriptions under management. + assert_eq!(subs.read().subs.len(), 2); + + // Drop first reserved subscription. + drop(reserved_sub_first); + // Check that the subscription is removed. + assert_eq!(subs.read().subs.len(), 1); + // Space is freed-up for the rpc connections. + let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + + // Drop all subscriptions. + drop(reserved_sub_first); + drop(reserved_sub_second); + assert_eq!(subs.read().subs.len(), 0); + } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index c830e662da2e..5b016af1aa49 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use jsonrpsee::ConnectionId; use parking_lot::RwLock; use sc_client_api::Backend; use sp_runtime::traits::Block as BlockT; @@ -24,6 +25,11 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; +use crate::{ + chain_head::chain_head::LOG_TARGET, + common::connections::{RegisteredConnection, ReservedConnection, RpcConnections}, +}; + use self::inner::SubscriptionsInner; pub use self::inner::OperationState; @@ -34,7 +40,22 @@ pub use inner::{BlockGuard, InsertedSubscriptionData}; pub struct SubscriptionManagement> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. - inner: RwLock>, + inner: Arc>>, + + /// Ensures that chainHead methods can be called from a single connection context. + /// + /// For example, `chainHead_storage` cannot be called with a subscription ID that + /// was obtained from a different connection. + rpc_connections: RpcConnections, +} + +impl> Clone for SubscriptionManagement { + fn clone(&self) -> Self { + SubscriptionManagement { + inner: self.inner.clone(), + rpc_connections: self.rpc_connections.clone(), + } + } } impl> SubscriptionManagement { @@ -43,30 +64,55 @@ impl> SubscriptionManagement { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, + max_follow_subscriptions_per_connection: usize, backend: Arc, ) -> Self { SubscriptionManagement { - inner: RwLock::new(SubscriptionsInner::new( + inner: Arc::new(RwLock::new(SubscriptionsInner::new( global_max_pinned_blocks, local_max_pin_duration, max_ongoing_operations, backend, - )), + ))), + rpc_connections: RpcConnections::new(max_follow_subscriptions_per_connection), } } - /// Insert a new subscription ID. + /// Create a new instance from the inner state. /// - /// If the subscription was not previously inserted, returns the receiver that is - /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already - /// inserted returns none. - pub fn insert_subscription( + /// # Note + /// + /// Used for testing. + #[cfg(test)] + pub(crate) fn _from_inner( + inner: Arc>>, + rpc_connections: RpcConnections, + ) -> Self { + SubscriptionManagement { inner, rpc_connections } + } + + /// Reserve space for a subscriptions. + /// + /// Fails if the connection ID is has reached the maximum number of active subscriptions. + pub fn reserve_subscription( &self, - sub_id: String, - runtime_updates: bool, - ) -> Option> { - let mut inner = self.inner.write(); - inner.insert_subscription(sub_id, runtime_updates) + connection_id: ConnectionId, + ) -> Option> { + let reserved_token = self.rpc_connections.reserve_space(connection_id)?; + + Some(ReservedSubscription { + state: ConnectionState::Reserved(reserved_token), + inner: self.inner.clone(), + }) + } + + /// Check if the given connection contains the given subscription. + pub fn contains_subscription( + &self, + connection_id: ConnectionId, + subscription_id: &str, + ) -> bool { + self.rpc_connections.contains_identifier(connection_id, subscription_id) } /// Remove the subscription ID with associated pinned blocks. @@ -136,3 +182,63 @@ impl> SubscriptionManagement { inner.get_operation(sub_id, operation_id) } } + +/// The state of the connection. +/// +/// The state starts in a [`ConnectionState::Reserved`] state and then transitions to +/// [`ConnectionState::Registered`] when the subscription is inserted. +enum ConnectionState { + Reserved(ReservedConnection), + Registered { _unregister_on_drop: RegisteredConnection, sub_id: String }, + Empty, +} + +/// RAII wrapper that removes the subscription from internal mappings and +/// gives back the reserved space for the connection. +pub struct ReservedSubscription> { + state: ConnectionState, + inner: Arc>>, +} + +impl> ReservedSubscription { + /// Insert a new subscription ID. + /// + /// If the subscription was not previously inserted, returns the receiver that is + /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already + /// inserted returns none. + /// + /// # Note + /// + /// This method should be called only once. + pub fn insert_subscription( + &mut self, + sub_id: String, + runtime_updates: bool, + ) -> Option> { + match std::mem::replace(&mut self.state, ConnectionState::Empty) { + ConnectionState::Reserved(reserved) => { + let registered_token = reserved.register(sub_id.clone())?; + self.state = ConnectionState::Registered { + _unregister_on_drop: registered_token, + sub_id: sub_id.clone(), + }; + + let mut inner = self.inner.write(); + inner.insert_subscription(sub_id, runtime_updates) + }, + // Cannot insert multiple subscriptions into one single reserved space. + ConnectionState::Registered { .. } | ConnectionState::Empty => { + log::error!(target: LOG_TARGET, "Called insert_subscription on a connection that is not reserved"); + None + }, + } + } +} + +impl> Drop for ReservedSubscription { + fn drop(&mut self) { + if let ConnectionState::Registered { sub_id, .. } = &self.state { + self.inner.write().remove_subscription(sub_id); + } + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 30152efb5b62..c3f10a201c58 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use crate::{ - chain_head::{event::MethodResponse, test_utils::ChainHeadMockClient}, + chain_head::{api::ChainHeadApiClient, event::MethodResponse, test_utils::ChainHeadMockClient}, common::events::{StorageQuery, StorageQueryType, StorageResultType}, hex_string, }; @@ -27,8 +27,12 @@ use assert_matches::assert_matches; use codec::{Decode, Encode}; use futures::Future; use jsonrpsee::{ - core::server::Subscription as RpcSubscription, rpc_params, MethodsError as Error, RpcModule, + core::{ + client::Subscription as RpcClientSubscription, server::Subscription as RpcSubscription, + }, + rpc_params, MethodsError as Error, RpcModule, }; + use sc_block_builder::BlockBuilderBuilder; use sc_client_api::ChildInfo; use sc_service::client::new_in_mem; @@ -59,6 +63,8 @@ const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; const MAX_PAGINATION_LIMIT: usize = 5; +const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; + const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; @@ -66,6 +72,35 @@ const CHILD_STORAGE_KEY: &[u8] = b"child"; const CHILD_VALUE: &[u8] = b"child value"; const DOES_NOT_PRODUCE_EVENTS_SECONDS: u64 = 10; +/// Start an RPC server with the chainHead module. +pub async fn run_server() -> std::net::SocketAddr { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + let api = ChainHead::new( + client, + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: 1, + }, + ) + .into_rpc(); + + let server = jsonrpsee::server::ServerBuilder::default().build("127.0.0.1:0").await.unwrap(); + + let addr = server.local_addr().unwrap(); + let handle = server.start(api); + + tokio::spawn(handle.stopped()); + addr +} + async fn get_next_event(sub: &mut RpcSubscription) -> T { let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) .await @@ -113,6 +148,7 @@ async fn setup_api() -> ( subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -163,6 +199,7 @@ async fn follow_subscription_produces_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -231,6 +268,7 @@ async fn follow_with_runtime() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -543,6 +581,7 @@ async fn call_runtime_without_flag() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1201,6 +1240,7 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1289,6 +1329,7 @@ async fn follow_generates_initial_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1444,6 +1485,7 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1520,6 +1562,7 @@ async fn follow_with_unpin() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1631,6 +1674,7 @@ async fn unpin_duplicate_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1733,6 +1777,7 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1886,6 +1931,7 @@ async fn follow_prune_best_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2071,6 +2117,7 @@ async fn follow_forks_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2230,6 +2277,7 @@ async fn follow_report_multiple_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2475,6 +2523,7 @@ async fn pin_block_references() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2612,6 +2661,7 @@ async fn follow_finalized_before_new_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2726,6 +2776,7 @@ async fn ensure_operation_limits_works() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2830,6 +2881,7 @@ async fn check_continue_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -3012,6 +3064,7 @@ async fn stop_storage_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -3297,3 +3350,176 @@ async fn storage_closest_merkle_value() { merkle_values_rhs.get(&hex_string(b":AAAA")).unwrap() ); } + +#[tokio::test] +async fn chain_head_single_connection_context() { + let server_addr = run_server().await; + let server_url = format!("ws://{}", server_addr); + let client = jsonrpsee::ws_client::WsClientBuilder::default() + .build(&server_url) + .await + .unwrap(); + // Calls cannot be made from a different connection context. + let second_client = jsonrpsee::ws_client::WsClientBuilder::default() + .build(&server_url) + .await + .unwrap(); + + let mut sub: RpcClientSubscription> = + ChainHeadApiClient::::chain_head_unstable_follow(&client, true) + .await + .unwrap(); + + let event = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let finalized_hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes.into_iter().last().unwrap(), + _ => panic!("Expected FollowEvent::Initialized"), + }; + + let first_sub_id = match sub.kind() { + jsonrpsee::core::client::SubscriptionKind::Subscription(id) => match id { + jsonrpsee::types::SubscriptionId::Num(num) => num.to_string(), + jsonrpsee::types::SubscriptionId::Str(s) => s.to_string(), + }, + _ => panic!("Unexpected subscription ID"), + }; + + // Trying to unpin from a different connection will have no effect. + let _response = ChainHeadApiClient::::chain_head_unstable_unpin( + &second_client, + first_sub_id.clone(), + crate::chain_head::api::ListOrValue::Value(finalized_hash.clone()), + ) + .await + .unwrap(); + + // Body can still be fetched from the first subscription. + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_body( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_body( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); + + let response: Option = ChainHeadApiClient::::chain_head_unstable_header( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert!(response.is_some()); + // Cannot make a call from a different connection context. + let response: Option = ChainHeadApiClient::::chain_head_unstable_header( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert!(response.is_none()); + + let key = hex_string(&KEY); + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_storage( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], + None, + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_storage( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], + None, + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); + + let alice_id = AccountKeyring::Alice.to_account_id(); + // Hex encoded scale encoded bytes representing the call parameters. + let call_parameters = hex_string(&alice_id.encode()); + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_call( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + "AccountNonceApi_account_nonce".into(), + call_parameters.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::::chain_head_unstable_call( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + "AccountNonceApi_account_nonce".into(), + call_parameters.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); +} + +#[tokio::test] +async fn chain_head_limit_reached() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + // Maximum of 1 chainHead_follow subscription. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Initialized must always be reported first. + let _event: FollowEvent = get_next_event(&mut sub).await; + + let error = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap_err(); + assert!(error + .to_string() + .contains("Maximum number of chainHead_follow has been reached")); + + // After dropping the subscription, other subscriptions are allowed to be created. + drop(sub); + // Ensure the `chainHead_unfollow` is propagated to the server. + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Initialized must always be reported first. + let _event: FollowEvent = get_next_event(&mut sub).await; +} diff --git a/substrate/client/rpc-spec-v2/src/common/connections.rs b/substrate/client/rpc-spec-v2/src/common/connections.rs new file mode 100644 index 000000000000..c16a80bf49db --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/common/connections.rs @@ -0,0 +1,262 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use jsonrpsee::ConnectionId; +use parking_lot::Mutex; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +/// Connection state which keeps track whether a connection exist and +/// the number of concurrent operations. +#[derive(Default, Clone)] +pub struct RpcConnections { + /// The number of identifiers that can be registered for each connection. + /// + /// # Example + /// + /// This is used to limit how many `chainHead_follow` subscriptions are active at one time. + capacity: usize, + /// Map the connecton ID to a set of identifiers. + data: Arc>>, +} + +#[derive(Default)] +struct ConnectionData { + /// The total number of identifiers for the given connection. + /// + /// An identifier for a connection might be: + /// - the subscription ID for chainHead_follow + /// - the operation ID for the transactionBroadcast API + /// - or simply how many times the transaction API has been called. + /// + /// # Note + /// + /// Because a pending subscription sink does not expose the future subscription ID, + /// we cannot register a subscription ID before the pending subscription is accepted. + /// This variable ensures that we have enough capacity to register an identifier, after + /// the subscription is accepted. Otherwise, a jsonrpc error object should be returned. + num_identifiers: usize, + /// Active registered identifiers for the given connection. + /// + /// # Note + /// + /// For chainHead, this represents the subscription ID. + /// For transactionBroadcast, this represents the operation ID. + /// For transaction, this is empty and the number of active calls is tracked by + /// [`Self::num_identifiers`]. + identifiers: HashSet, +} + +impl RpcConnections { + /// Constructs a new instance of [`RpcConnections`]. + pub fn new(capacity: usize) -> Self { + RpcConnections { capacity, data: Default::default() } + } + + /// Reserve space for a new connection identifier. + /// + /// If the number of active identifiers for the given connection exceeds the capacity, + /// returns None. + pub fn reserve_space(&self, connection_id: ConnectionId) -> Option { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + if entry.num_identifiers >= self.capacity { + return None; + } + entry.num_identifiers = entry.num_identifiers.saturating_add(1); + + Some(ReservedConnection { connection_id, rpc_connections: Some(self.clone()) }) + } + + /// Gives back the reserved space before the connection identifier is registered. + /// + /// # Note + /// + /// This may happen if the pending subscription cannot be accepted (unlikely). + fn unreserve_space(&self, connection_id: ConnectionId) { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + entry.num_identifiers = entry.num_identifiers.saturating_sub(1); + + if entry.num_identifiers == 0 { + data.remove(&connection_id); + } + } + + /// Register an identifier for the given connection. + /// + /// Users must call [`Self::reserve_space`] before calling this method to ensure enough + /// space is available. + /// + /// Returns true if the identifier was inserted successfully, false if the identifier was + /// already inserted or reached capacity. + fn register_identifier(&self, connection_id: ConnectionId, identifier: String) -> bool { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + // Should be already checked `Self::reserve_space`. + if entry.identifiers.len() >= self.capacity { + return false; + } + + entry.identifiers.insert(identifier) + } + + /// Unregister an identifier for the given connection. + fn unregister_identifier(&self, connection_id: ConnectionId, identifier: &str) { + let mut data = self.data.lock(); + if let Some(connection_data) = data.get_mut(&connection_id) { + connection_data.identifiers.remove(identifier); + connection_data.num_identifiers = connection_data.num_identifiers.saturating_sub(1); + + if connection_data.num_identifiers == 0 { + data.remove(&connection_id); + } + } + } + + /// Check if the given connection contains the given identifier. + pub fn contains_identifier(&self, connection_id: ConnectionId, identifier: &str) -> bool { + let data = self.data.lock(); + data.get(&connection_id) + .map(|connection_data| connection_data.identifiers.contains(identifier)) + .unwrap_or(false) + } +} + +/// RAII wrapper that ensures the reserved space is given back if the object is +/// dropped before the identifier is registered. +pub struct ReservedConnection { + connection_id: ConnectionId, + rpc_connections: Option, +} + +impl ReservedConnection { + /// Register the identifier for the given connection. + pub fn register(mut self, identifier: String) -> Option { + let rpc_connections = self.rpc_connections.take()?; + + if rpc_connections.register_identifier(self.connection_id, identifier.clone()) { + Some(RegisteredConnection { + connection_id: self.connection_id, + identifier, + rpc_connections, + }) + } else { + None + } + } +} + +impl Drop for ReservedConnection { + fn drop(&mut self) { + if let Some(rpc_connections) = self.rpc_connections.take() { + rpc_connections.unreserve_space(self.connection_id); + } + } +} + +/// RAII wrapper that ensures the identifier is unregistered if the object is dropped. +pub struct RegisteredConnection { + connection_id: ConnectionId, + identifier: String, + rpc_connections: RpcConnections, +} + +impl Drop for RegisteredConnection { + fn drop(&mut self) { + self.rpc_connections.unregister_identifier(self.connection_id, &self.identifier); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn reserve_space() { + let rpc_connections = RpcConnections::new(2); + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + assert_eq!(rpc_connections.data.lock().len(), 1); + + let reserved = reserved.unwrap(); + let registered = reserved.register("identifier1".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier1")); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + drop(registered); + + // Data is dropped. + assert!(rpc_connections.data.lock().get(&1).is_none()); + assert!(rpc_connections.data.lock().is_empty()); + // Checks can still happen. + assert!(!rpc_connections.contains_identifier(1, "identifier1")); + } + + #[test] + fn reserve_space_capacity_reached() { + let rpc_connections = RpcConnections::new(2); + + // Reserve identifier for connection 1. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Add identifier for connection 1. + let reserved = reserved.unwrap(); + let registered = reserved.register("identifier1".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier1")); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Reserve identifier for connection 1 again. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Add identifier for connection 1 again. + let reserved = reserved.unwrap(); + let registered_second = reserved.register("identifier2".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier2")); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Cannot reserve more identifiers. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_none()); + + // Drop the first identifier. + drop(registered); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + assert!(rpc_connections.contains_identifier(1, "identifier2")); + assert!(!rpc_connections.contains_identifier(1, "identifier1")); + + // Can reserve again after clearing the space. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Ensure data is cleared. + drop(reserved); + drop(registered_second); + assert!(rpc_connections.data.lock().get(&1).is_none()); + } +} diff --git a/substrate/client/rpc-spec-v2/src/common/mod.rs b/substrate/client/rpc-spec-v2/src/common/mod.rs index ac1af8fce3c9..3167561d649a 100644 --- a/substrate/client/rpc-spec-v2/src/common/mod.rs +++ b/substrate/client/rpc-spec-v2/src/common/mod.rs @@ -13,5 +13,6 @@ //! Common types and functionality for the RPC-V2 spec. +pub mod connections; pub mod events; pub mod storage;