Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync tx submission with chainHead_follow #1305

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,26 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// First, subscribe to all new block hashes
let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| {
// We care about new and finalized block hashes.
enum SeenBlock<Ref> {
New(Ref),
Finalized(Vec<Ref>),
}
enum SeenBlockMarker {
New,
Finalized,
}

// First, subscribe to all new and finalized block refs.
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
// can try to return a block ref for the best block.
// - we subscribe to finalized refs so that when we see `Finalized`, we can
// guarantee that when we return here, the finalized block we report has been
// reported from chainHead_follow already.
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
std::future::ready(match ev {
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
_ => None,
})
});
Expand All @@ -453,8 +469,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let mut seen_blocks = HashMap::new();
let mut done = false;

// If we see the finalized event, we start waiting until we find a block that
// matches, so we can guarantee to return a pinned block hash.
// If we see the finalized event, we start waiting until we find a finalized block that
// matches, so we can guarantee to return a pinned block hash and be properly in sync
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;

// Now we can attempt to associate tx events with pinned blocks.
Expand All @@ -465,25 +482,42 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
return Poll::Ready(None);
}

// Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate
// pinned blocks when we are looking at our tx events.
if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) {
seen_blocks.insert(block_ref.hash(), block_ref);
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
SeenBlock::New(block_ref) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
}
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
}
}
}
continue;
}

// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
if let Some(block_ref) = seen_blocks.remove(hash) {
if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done.
done = true;
let ev = TransactionStatus::InFinalizedBlock {
hash: block_ref.into(),
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more new blocks until we find it (get rid of any other block refs
// Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
seen_blocks.clear();
continue;
Expand Down Expand Up @@ -517,8 +551,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash).cloned() {
Some(block_ref) => block_ref.into(),
let block_ref = match seen_blocks.get(&block.hash) {
Some((_, block_ref)) => block_ref.clone().into(),
None => BlockRef::from_hash(block.hash),
};
TransactionStatus::InBestBlock { hash: block_ref }
Expand Down
Loading