Skip to content

Commit

Permalink
Add stale branches heads to finality notifications (paritytech#10639)
Browse files Browse the repository at this point in the history
* Add stale branches heads to finality notifications

Warning. Previous implementation was sending a notification for
each block between the previous (explicitly) finalized block and
the new finalized one (with an hardcoded limit of 256).

Now finality notification is sent only for the new finalized head and it
contains the hash of the new finalized head, new finalized head header,
a list of all the implicitly finalized blocks and a list of stale
branches heads (i.e. the branches heads that are not part of the
canonical chain anymore).

* Add implicitly finalized blocks list to `ChainEvent::Finalized` message

The list contains all the blocks between the previously finalized block
up to the parent of the currently finalized one, sorted by block number.

`Finalized` messages handler, part of the `MaintainedTransactionPool`
implementation for `BasicPool`, still propagate full set of finalized
blocks to the txpool by iterating over implicitly finalized blocks list.

* Rust fmt

* Greedy evaluation of `stale_heads` during finalization

* Fix outdated assumption in a comment

* Removed a test optimization that is no more relevant

The loop was there to prevent sending to
`peer.network.on_block_finalized` the full list of finalized blocks.

Now only the finalized heads are received.

* Last finalized block lookup not required anymore

* Tests for block finality notifications payloads

* Document a bit tricky condition to avoid duplicate finalization notifications

* More idiomatic way to skip an iterator entry

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Cargo fmt iteration

* Typo fix

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix potential failure when a finalized orphan block is imported

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
2 people authored and Wizdave97 committed Feb 4, 2022
1 parent b38ec2e commit dbb8f25
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 111 deletions.
18 changes: 15 additions & 3 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,26 @@ pub struct ImportSummary<Block: BlockT> {
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
}

/// Import operation wrapper
/// Finalization operation summary.
///
/// Contains information about the block that just got finalized,
/// including tree heads that became stale at the moment of finalization.
pub struct FinalizeSummary<Block: BlockT> {
/// Blocks that were finalized.
/// The last entry is the one that has been explicitly finalized.
pub finalized: Vec<Block::Hash>,
/// Heads that became stale during this finalization operation.
pub stale_heads: Vec<Block::Hash>,
}

/// Import operation wrapper.
pub struct ClientImportOperation<Block: BlockT, B: Backend<Block>> {
/// DB Operation.
pub op: B::BlockImportOperation,
/// Summary of imported block.
pub notify_imported: Option<ImportSummary<Block>>,
/// A list of hashes of blocks that got finalized.
pub notify_finalized: Vec<Block::Hash>,
/// Summary of finalized block.
pub notify_finalized: Option<FinalizeSummary<Block>>,
}

/// Helper function to apply auxiliary data insertion into an operation.
Expand Down
10 changes: 7 additions & 3 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,14 @@ pub struct BlockImportNotification<Block: BlockT> {
/// Summary of a finalized block.
#[derive(Clone, Debug)]
pub struct FinalityNotification<Block: BlockT> {
/// Imported block header hash.
/// Finalized block header hash.
pub hash: Block::Hash,
/// Imported block header.
/// Finalized block header.
pub header: Block::Header,
/// Path from the old finalized to new finalized parent (implicitly finalized blocks).
pub tree_route: Arc<Vec<Block::Hash>>,
/// Stale branches heads.
pub stale_heads: Arc<Vec<Block::Hash>>,
}

impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {
Expand All @@ -293,6 +297,6 @@ impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {

impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
fn from(n: FinalityNotification<B>) -> Self {
Self::Finalized { hash: n.hash }
Self::Finalized { hash: n.hash, tree_route: n.tree_route }
}
}
4 changes: 1 addition & 3 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ impl<B: BlockT> ExtraRequests<B> {
}

if best_finalized_number > self.best_seen_finalized_number {
// normally we'll receive finality notifications for every block => finalize would be
// enough but if many blocks are finalized at once, some notifications may be omitted
// => let's use finalize_with_ancestors here
// we receive finality notification only for the finalized branch head.
match self.tree.finalize_with_ancestors(
best_finalized_hash,
best_finalized_number,
Expand Down
8 changes: 2 additions & 6 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,14 +1000,10 @@ where
peer.network.service().announce_block(notification.hash, None);
}

// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Poll::Ready(Some(item)) =
// We poll `finality_notification_stream`.
while let Poll::Ready(Some(notification)) =
peer.finality_notification_stream.as_mut().poll_next(cx)
{
last = Some(item);
}
if let Some(notification) = last {
peer.network.on_block_finalized(notification.hash, notification.header);
}
}
Expand Down
162 changes: 105 additions & 57 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use rand::Rng;
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, Finalizer, ImportSummary,
LockImportRun, NewBlockState, StorageProvider,
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
Expand Down Expand Up @@ -274,7 +274,7 @@ where
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
notify_finalized: None,
};

let r = f(&mut op)?;
Expand Down Expand Up @@ -622,25 +622,25 @@ where
None
},
};
// Ensure parent chain is finalized to maintain invariant that
// finality is called sequentially. This will also send finality
// notifications for top 250 newly finalized blocks.
if finalized && parent_exists {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}

operation.op.update_cache(new_cache);
storage_changes
},
None => None,
};

// Ensure parent chain is finalized to maintain invariant that finality is called
// sequentially.
if finalized && parent_exists {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}

let is_new_best = !gap_block &&
(finalized ||
match fork_choice {
Expand Down Expand Up @@ -683,11 +683,36 @@ where

operation.op.insert_aux(aux)?;

// we only notify when we are already synced to the tip of the chain
// We only notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
if finalized {
operation.notify_finalized.push(hash);
let mut summary = match operation.notify_finalized.take() {
Some(summary) => summary,
None => FinalizeSummary { finalized: Vec::new(), stale_heads: Vec::new() },
};
summary.finalized.push(hash);
if parent_exists {
// Add to the stale list all heads that are branching from parent besides our
// current `head`.
for head in self
.backend
.blockchain()
.leaves()?
.into_iter()
.filter(|h| *h != parent_hash)
{
let route_from_parent = sp_blockchain::tree_route(
self.backend.blockchain(),
parent_hash,
head,
)?;
if route_from_parent.retracted().is_empty() {
summary.stale_heads.push(head);
}
}
}
operation.notify_finalized = Some(summary);
}

operation.notify_imported = Some(ImportSummary {
Expand Down Expand Up @@ -831,58 +856,82 @@ where
operation.op.mark_finalized(BlockId::Hash(block), justification)?;

if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
for finalized in &enacted[start..] {
operation.notify_finalized.push(finalized.hash);
let finalized =
route_from_finalized.enacted().iter().map(|elem| elem.hash).collect::<Vec<_>>();

let last_finalized_number = self
.backend
.blockchain()
.number(last_finalized)?
.expect("Finalized block expected to be onchain; qed");
let mut stale_heads = Vec::new();
for head in self.backend.blockchain().leaves()? {
let route_from_finalized =
sp_blockchain::tree_route(self.backend.blockchain(), block, head)?;
let retracted = route_from_finalized.retracted();
let pivot = route_from_finalized.common_block();
// It is not guaranteed that `backend.blockchain().leaves()` doesn't return
// heads that were in a stale state before this finalization and thus already
// included in previous notifications. We want to skip such heads.
// Given the "route" from the currently finalized block to the head under
// analysis, the condition for it to be added to the new stale heads list is:
// `!retracted.is_empty() && last_finalized_number <= pivot.number`
// 1. "route" has some "retractions".
// 2. previously finalized block number is not greater than the "route" pivot:
// - if `last_finalized_number <= pivot.number` then this is a new stale head;
// - else the stale head was already included by some previous finalization.
if !retracted.is_empty() && last_finalized_number <= pivot.number {
stale_heads.push(head);
}
}
operation.notify_finalized = Some(FinalizeSummary { finalized, stale_heads });
}

Ok(())
}

fn notify_finalized(&self, notify_finalized: Vec<Block::Hash>) -> sp_blockchain::Result<()> {
fn notify_finalized(
&self,
notify_finalized: Option<FinalizeSummary<Block>>,
) -> sp_blockchain::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();

if notify_finalized.is_empty() {
// cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());

return Ok(())
}
let mut notify_finalized = match notify_finalized {
Some(notify_finalized) => notify_finalized,
None => {
// Cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());
return Ok(())
},
};

// We assume the list is sorted and only want to inform the
// telemetry once about the finalized block.
if let Some(last) = notify_finalized.last() {
let header = self.header(&BlockId::Hash(*last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
let last = notify_finalized.finalized.pop().expect(
"At least one finalized block shall exist within a valid finalization summary; qed",
);

telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);
}
let header = self.header(&BlockId::Hash(last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);

for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);

let notification = FinalityNotification { header, hash: finalized_hash };
let notification = FinalityNotification {
hash: last,
header,
tree_route: Arc::new(notify_finalized.finalized),
stale_heads: Arc::new(notify_finalized.stale_heads),
};

sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

Ok(())
}
Expand All @@ -901,7 +950,6 @@ where
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());

return Ok(())
},
};
Expand Down
25 changes: 3 additions & 22 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ mod client;
mod metrics;
mod task_manager;

use std::{collections::HashMap, io, net::SocketAddr, pin::Pin, task::Poll};
use std::{collections::HashMap, io, net::SocketAddr, pin::Pin};

use codec::{Decode, Encode};
use futures::{stream, Future, FutureExt, Stream, StreamExt};
use futures::{Future, FutureExt, StreamExt};
use log::{debug, error, warn};
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
Expand Down Expand Up @@ -152,26 +152,7 @@ async fn build_network_future<
let starting_block = client.info().best_number;

// Stream of finalized blocks reported by the client.
let mut finality_notification_stream = {
let mut finality_notification_stream = client.finality_notification_stream().fuse();

// We tweak the `Stream` in order to merge together multiple items if they happen to be
// ready. This way, we only get the latest finalized block.
stream::poll_fn(move |cx| {
let mut last = None;
while let Poll::Ready(Some(item)) =
Pin::new(&mut finality_notification_stream).poll_next(cx)
{
last = Some(item);
}
if let Some(last) = last {
Poll::Ready(Some(last))
} else {
Poll::Pending
}
})
.fuse()
};
let mut finality_notification_stream = client.finality_notification_stream().fuse();

loop {
futures::select! {
Expand Down
Loading

0 comments on commit dbb8f25

Please sign in to comment.