Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Support the subscription of every imported block (#13372)
Browse files Browse the repository at this point in the history
* Support the subscription of every import block

Close #13315

* Clean up any closed block import notification sinks

* Apply review suggestions

* Nit

* `every_block_import_notification_sinks` -> `every_import_notification_sinks`

* Apply review suggestions
  • Loading branch information
liuchengxu authored Feb 27, 2023
1 parent 9526f04 commit 3d8a025
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 31 deletions.
15 changes: 15 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ pub type TransactionForSB<B, Block> = <B as StateBackend<HashFor<Block>>>::Trans
/// Extracts the transaction for the given backend.
pub type TransactionFor<B, Block> = TransactionForSB<StateBackendFor<B, Block>, Block>;

/// Describes which block import notification stream should be notified.
#[derive(Debug, Clone, Copy)]
pub enum ImportNotificationAction {
/// Notify only when the node has synced to the tip or there is a re-org.
RecentBlock,
/// Notify for every single block no matter what the sync state is.
EveryBlock,
/// Both block import notifications above should be fired.
Both,
/// No block import notification should be fired.
None,
}

/// Import operation summary.
///
/// Contains information about the block that just got imported,
Expand All @@ -67,6 +80,8 @@ pub struct ImportSummary<Block: BlockT> {
///
/// If `None`, there was no re-org while importing.
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
/// What notify action to take for this import.
pub import_notification_action: ImportNotificationAction,
}

/// Finalization operation summary.
Expand Down
10 changes: 8 additions & 2 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ pub trait BlockOf {

/// A source of blockchain events.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream. Not guaranteed to be fired for every
/// imported block.
/// Get block import event stream.
///
/// Not guaranteed to be fired for every imported block, only fired when the node
/// has synced to the tip or there is a re-org. Use `every_import_notification_stream()`
/// if you want a notification of every imported block regardless.
fn import_notification_stream(&self) -> ImportNotifications<Block>;

/// Get a stream of every imported block.
fn every_import_notification_stream(&self) -> ImportNotifications<Block>;

/// Get a stream of finality notifications. Not guaranteed to be fired for every
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;
Expand Down
4 changes: 4 additions & 0 deletions client/merkle-mountain-range/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ impl BlockchainEvents<Block> for MockClient {
unimplemented!()
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
unimplemented!()
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
self.client.lock().finality_notification_stream()
}
Expand Down
122 changes: 93 additions & 29 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
Expand Down Expand Up @@ -106,6 +106,7 @@ where
executor: E,
storage_notifications: StorageNotifications<Block>,
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
every_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
// Collects auxiliary operations to be performed atomically together with
// block import operations.
Expand Down Expand Up @@ -304,19 +305,22 @@ where
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
});

let (import_notification, storage_changes) = match notify_imported {
Some(mut summary) => {
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
)
},
None => (None, None),
};
let (import_notification, storage_changes, import_notification_action) =
match notify_imported {
Some(mut summary) => {
let import_notification_action = summary.import_notification_action;
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
import_notification_action,
)
},
None => (None, None, ImportNotificationAction::None),
};

if let Some(ref notification) = finality_notification {
for action in self.finality_actions.lock().iter_mut() {
Expand Down Expand Up @@ -353,7 +357,7 @@ where
}

self.notify_finalized(finality_notification)?;
self.notify_imported(import_notification, storage_changes)?;
self.notify_imported(import_notification, import_notification_action, storage_changes)?;

Ok(r)
};
Expand Down Expand Up @@ -451,6 +455,7 @@ where
executor,
storage_notifications: StorageNotifications::new(prometheus_registry),
import_notification_sinks: Default::default(),
every_import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_actions: Default::default(),
finality_actions: Default::default(),
Expand Down Expand Up @@ -769,11 +774,15 @@ where

operation.op.insert_aux(aux)?;

// We only notify when we are already synced to the tip of the chain
let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty();

// Notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
let should_notify_recent_block = make_notifications || tree_route.is_some();

if should_notify_every_block || should_notify_recent_block {
let header = import_headers.into_post();
if finalized {
if finalized && should_notify_recent_block {
let mut summary = match operation.notify_finalized.take() {
Some(mut summary) => {
summary.header = header.clone();
Expand Down Expand Up @@ -810,13 +819,24 @@ where
operation.notify_finalized = Some(summary);
}

let import_notification_action = if should_notify_every_block {
if should_notify_recent_block {
ImportNotificationAction::Both
} else {
ImportNotificationAction::EveryBlock
}
} else {
ImportNotificationAction::RecentBlock
};

operation.notify_imported = Some(ImportSummary {
hash,
origin,
header,
is_new_best,
storage_changes,
tree_route,
import_notification_action,
})
}

Expand Down Expand Up @@ -1012,6 +1032,7 @@ where
fn notify_imported(
&self,
notification: Option<BlockImportNotification<Block>>,
import_notification_action: ImportNotificationAction,
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
) -> sp_blockchain::Result<()> {
let notification = match notification {
Expand All @@ -1024,22 +1045,59 @@ where
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());

self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());

return Ok(())
},
};

if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.trigger(
&notification.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}
let trigger_storage_changes_notification = || {
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.trigger(
&notification.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}
};

match import_notification_action {
ImportNotificationAction::Both => {
trigger_storage_changes_notification();
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
},
ImportNotificationAction::RecentBlock => {
trigger_storage_changes_notification();
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
ImportNotificationAction::EveryBlock => {
self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
ImportNotificationAction::None => {
// This branch is unreachable in fact because the block import notification must be
// Some(_) instead of None (it's already handled at the beginning of this function)
// at this point.
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());

self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
},
}

Ok(())
}
Expand Down Expand Up @@ -1944,6 +2002,12 @@ where
stream
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
self.every_import_notification_sinks.lock().push(sink);
stream
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
self.finality_notification_sinks.lock().push(sink);
Expand Down

0 comments on commit 3d8a025

Please sign in to comment.