From 32a0f387d7fa8d037b5b6498f9290cda58fe50cf Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 27 Feb 2023 17:02:54 +0800 Subject: [PATCH] Support the subscription of every imported block (#13372) * Support the subscription of every import block Close #13315 * Clean up any closed block import notification sinks * Apply review suggestions * Nit * `every_block_import_notification_sinks` -> `every_import_notification_sinks` * Apply review suggestions --- client/api/src/backend.rs | 15 +++ client/api/src/client.rs | 10 +- .../merkle-mountain-range/src/test_utils.rs | 4 + client/service/src/client/client.rs | 122 +++++++++++++----- 4 files changed, 120 insertions(+), 31 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 486ac50611e35..f991bea8821e0 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -48,6 +48,19 @@ pub type TransactionForSB = >>::Trans /// Extracts the transaction for the given backend. pub type TransactionFor = TransactionForSB, Block>; +/// Describes which block import notification stream should be notified. +#[derive(Debug, Clone, Copy)] +pub enum ImportNotificationAction { + /// Notify only when the node has synced to the tip or there is a re-org. + RecentBlock, + /// Notify for every single block no matter what the sync state is. + EveryBlock, + /// Both block import notifications above should be fired. + Both, + /// No block import notification should be fired. + None, +} + /// Import operation summary. /// /// Contains information about the block that just got imported, @@ -67,6 +80,8 @@ pub struct ImportSummary { /// /// If `None`, there was no re-org while importing. pub tree_route: Option>, + /// What notify action to take for this import. + pub import_notification_action: ImportNotificationAction, } /// Finalization operation summary. diff --git a/client/api/src/client.rs b/client/api/src/client.rs index aeb119e0ef77a..e334f2f9fb4f6 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -59,10 +59,16 @@ pub trait BlockOf { /// A source of blockchain events. pub trait BlockchainEvents { - /// Get block import event stream. Not guaranteed to be fired for every - /// imported block. + /// Get block import event stream. + /// + /// Not guaranteed to be fired for every imported block, only fired when the node + /// has synced to the tip or there is a re-org. Use `every_import_notification_stream()` + /// if you want a notification of every imported block regardless. fn import_notification_stream(&self) -> ImportNotifications; + /// Get a stream of every imported block. + fn every_import_notification_stream(&self) -> ImportNotifications; + /// Get a stream of finality notifications. Not guaranteed to be fired for every /// finalized block. fn finality_notification_stream(&self) -> FinalityNotifications; diff --git a/client/merkle-mountain-range/src/test_utils.rs b/client/merkle-mountain-range/src/test_utils.rs index 9e00d620a115c..010b48bb3d7da 100644 --- a/client/merkle-mountain-range/src/test_utils.rs +++ b/client/merkle-mountain-range/src/test_utils.rs @@ -265,6 +265,10 @@ impl BlockchainEvents for MockClient { unimplemented!() } + fn every_import_notification_stream(&self) -> ImportNotifications { + unimplemented!() + } + fn finality_notification_stream(&self) -> FinalityNotifications { self.client.lock().finality_notification_stream() } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index ddeec5c73e63e..4b5822ae0e017 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof}; use sc_client_api::{ backend::{ self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer, - ImportSummary, LockImportRun, NewBlockState, StorageProvider, + ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider, }, client::{ BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo, @@ -106,6 +106,7 @@ where executor: E, storage_notifications: StorageNotifications, import_notification_sinks: NotificationSinks>, + every_import_notification_sinks: NotificationSinks>, finality_notification_sinks: NotificationSinks>, // Collects auxiliary operations to be performed atomically together with // block import operations. @@ -304,19 +305,22 @@ where FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone()) }); - let (import_notification, storage_changes) = match notify_imported { - Some(mut summary) => { - let storage_changes = summary.storage_changes.take(); - ( - Some(BlockImportNotification::from_summary( - summary, - self.unpin_worker_sender.clone(), - )), - storage_changes, - ) - }, - None => (None, None), - }; + let (import_notification, storage_changes, import_notification_action) = + match notify_imported { + Some(mut summary) => { + let import_notification_action = summary.import_notification_action; + let storage_changes = summary.storage_changes.take(); + ( + Some(BlockImportNotification::from_summary( + summary, + self.unpin_worker_sender.clone(), + )), + storage_changes, + import_notification_action, + ) + }, + None => (None, None, ImportNotificationAction::None), + }; if let Some(ref notification) = finality_notification { for action in self.finality_actions.lock().iter_mut() { @@ -353,7 +357,7 @@ where } self.notify_finalized(finality_notification)?; - self.notify_imported(import_notification, storage_changes)?; + self.notify_imported(import_notification, import_notification_action, storage_changes)?; Ok(r) }; @@ -451,6 +455,7 @@ where executor, storage_notifications: StorageNotifications::new(prometheus_registry), import_notification_sinks: Default::default(), + every_import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), import_actions: Default::default(), finality_actions: Default::default(), @@ -769,11 +774,15 @@ where operation.op.insert_aux(aux)?; - // We only notify when we are already synced to the tip of the chain + let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty(); + + // Notify when we are already synced to the tip of the chain // or if this import triggers a re-org - if make_notifications || tree_route.is_some() { + let should_notify_recent_block = make_notifications || tree_route.is_some(); + + if should_notify_every_block || should_notify_recent_block { let header = import_headers.into_post(); - if finalized { + if finalized && should_notify_recent_block { let mut summary = match operation.notify_finalized.take() { Some(mut summary) => { summary.header = header.clone(); @@ -810,6 +819,16 @@ where operation.notify_finalized = Some(summary); } + let import_notification_action = if should_notify_every_block { + if should_notify_recent_block { + ImportNotificationAction::Both + } else { + ImportNotificationAction::EveryBlock + } + } else { + ImportNotificationAction::RecentBlock + }; + operation.notify_imported = Some(ImportSummary { hash, origin, @@ -817,6 +836,7 @@ where is_new_best, storage_changes, tree_route, + import_notification_action, }) } @@ -1012,6 +1032,7 @@ where fn notify_imported( &self, notification: Option>, + import_notification_action: ImportNotificationAction, storage_changes: Option<(StorageCollection, ChildStorageCollection)>, ) -> sp_blockchain::Result<()> { let notification = match notification { @@ -1024,22 +1045,59 @@ where // temporary leak of closed/discarded notification sinks (e.g. // from consensus code). self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + return Ok(()) }, }; - if let Some(storage_changes) = storage_changes { - // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.trigger( - ¬ification.hash, - storage_changes.0.into_iter(), - storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), - ); - } + let trigger_storage_changes_notification = || { + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.trigger( + ¬ification.hash, + storage_changes.0.into_iter(), + storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), + ); + } + }; + + match import_notification_action { + ImportNotificationAction::Both => { + trigger_storage_changes_notification(); + self.import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + self.every_import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + }, + ImportNotificationAction::RecentBlock => { + trigger_storage_changes_notification(); + self.import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + }, + ImportNotificationAction::EveryBlock => { + self.every_import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + }, + ImportNotificationAction::None => { + // This branch is unreachable in fact because the block import notification must be + // Some(_) instead of None (it's already handled at the beginning of this function) + // at this point. + self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); - self.import_notification_sinks - .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + }, + } Ok(()) } @@ -1944,6 +2002,12 @@ where stream } + fn every_import_notification_stream(&self) -> ImportNotifications { + let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000); + self.every_import_notification_sinks.lock().push(sink); + stream + } + fn finality_notification_stream(&self) -> FinalityNotifications { let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000); self.finality_notification_sinks.lock().push(sink);