From f94b60544450181c5d342110bb8d80c1f0f2a76d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 4 Dec 2023 11:40:37 +0000 Subject: [PATCH 1/3] sync tx submission with chainHead_follow --- subxt/src/backend/unstable/mod.rs | 56 ++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 6610e5f903..7e08577285 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -436,10 +436,26 @@ impl Backend for UnstableBackend { &self, extrinsic: &[u8], ) -> Result>, Error> { - // First, subscribe to all new block hashes - let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| { + // We care about new and finalized block hashes. + enum SeenBlock { + New(BlockRef), + Finalized(Vec>), + } + enum SeenBlockMarker { + New, + Finalized, + } + + // First, subscribe to all new and finalized block refs. + // - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we + // can try to return a block ref for the best block. + // - we subscribe to finalized refs so that when we see `Finalized`, we can + // guarantee that when we return here, the finalized block we report has been + // reported from chainHead_follow already. + let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| { std::future::ready(match ev { - FollowEvent::NewBlock(ev) => Some(ev.block_hash), + FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)), + FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_blocks)), _ => None, }) }); @@ -453,8 +469,9 @@ impl Backend for UnstableBackend { let mut seen_blocks = HashMap::new(); let mut done = false; - // If we see the finalized event, we start waiting until we find a block that - // matches, so we can guarantee to return a pinned block hash. + // If we see the finalized event, we start waiting until we find a finalized block that + // matches, so we can guarantee to return a pinned block hash and be properly in sync + // with chainHead_follow. let mut finalized_hash: Option = None; // Now we can attempt to associate tx events with pinned blocks. @@ -465,17 +482,32 @@ impl Backend for UnstableBackend { return Poll::Ready(None); } - // Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate - // pinned blocks when we are looking at our tx events. - if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) { - seen_blocks.insert(block_ref.hash(), block_ref); + // Make a note of new or finalized blocks that have come in since we started the TX. + if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { + match seen_block { + SeenBlock::New(block_ref) => { + if finalized_hash.is_none() { + seen_blocks + .insert(block_ref.hash(), (SeenBlockMarker::New, block_ref)); + } + } + SeenBlock::Finalized(block_refs) => { + for block_ref in block_refs { + seen_blocks.insert( + block_ref.hash(), + (SeenBlockMarker::Finalized, block_ref), + ); + } + } + } continue; } // If we have a finalized hash, we are done looking for tx events and we are just waiting // for a pinned block with a matching hash (which must appear eventually given it's finalized). if let Some(hash) = &finalized_hash { - if let Some(block_ref) = seen_blocks.remove(hash) { + if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash) + { // Found it! Hand back the event with a pinned block. We're done. done = true; let ev = TransactionStatus::InFinalizedBlock { @@ -483,7 +515,7 @@ impl Backend for UnstableBackend { }; return Poll::Ready(Some(Ok(ev))); } else { - // Keep waiting for more new blocks until we find it (get rid of any other block refs + // Keep waiting for more finalized blocks until we find it (get rid of any other block refs // now, since none of them were what we were looking for anyway). seen_blocks.clear(); continue; @@ -518,7 +550,7 @@ impl Backend for UnstableBackend { // block on the node a tx was sent to will ever be known about on the // chainHead_follow subscription. let block_ref = match seen_blocks.get(&block.hash).cloned() { - Some(block_ref) => block_ref.into(), + Some((_, block_ref)) => block_ref.into(), None => BlockRef::from_hash(block.hash), }; TransactionStatus::InBestBlock { hash: block_ref } From 76c4c180649a0bd21f0cbcc2a889eeac875ff0ba Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 4 Dec 2023 11:54:24 +0000 Subject: [PATCH 2/3] make it compile --- subxt/src/backend/unstable/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 7e08577285..05b78cbd89 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -437,9 +437,9 @@ impl Backend for UnstableBackend { extrinsic: &[u8], ) -> Result>, Error> { // We care about new and finalized block hashes. - enum SeenBlock { - New(BlockRef), - Finalized(Vec>), + enum SeenBlock { + New(Ref), + Finalized(Vec), } enum SeenBlockMarker { New, @@ -455,7 +455,7 @@ impl Backend for UnstableBackend { let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| { std::future::ready(match ev { FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)), - FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_blocks)), + FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)), _ => None, }) }); @@ -549,8 +549,8 @@ impl Backend for UnstableBackend { // block that likely isn't accessible. We have no guarantee that a best // block on the node a tx was sent to will ever be known about on the // chainHead_follow subscription. - let block_ref = match seen_blocks.get(&block.hash).cloned() { - Some((_, block_ref)) => block_ref.into(), + let block_ref = match seen_blocks.get(&block.hash) { + Some((_, block_ref)) => block_ref.clone().into(), None => BlockRef::from_hash(block.hash), }; TransactionStatus::InBestBlock { hash: block_ref } From 8697d1932918b290f2ea20a2f550414e57501323 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 4 Dec 2023 11:57:40 +0000 Subject: [PATCH 3/3] add small comment --- subxt/src/backend/unstable/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 05b78cbd89..fb1437352d 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -486,6 +486,8 @@ impl Backend for UnstableBackend { if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) { match seen_block { SeenBlock::New(block_ref) => { + // Optimization: once we have a `finalized_hash`, we only care about finalized + // block refs now and can avoid bothering to save new blocks. if finalized_hash.is_none() { seen_blocks .insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));