diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index dddc332f43e97..e59f7ab5b601c 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -96,7 +96,13 @@ impl ImportQueue for BasicQueue } trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + if res.is_err() { + log::error!( + target: "sync", + "import_blocks: Background import task is no longer alive" + ); + } } fn import_justification( @@ -106,10 +112,16 @@ impl ImportQueue for BasicQueue number: NumberFor, justification: Justification ) { - let _ = self.sender + let res = self.sender .unbounded_send( ToWorkerMsg::ImportJustification(who, hash, number, justification) ); + if res.is_err() { + log::error!( + target: "sync", + "import_justification: Background import task is no longer alive" + ); + } } fn import_finality_proof( @@ -120,14 +132,22 @@ impl ImportQueue for BasicQueue finality_proof: Vec, ) { trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let _ = self.sender + let res = self.sender .unbounded_send( ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) ); + if res.is_err() { + log::error!( + target: "sync", + "import_finality_proof: Background import task is no longer alive" + ); + } } fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { - self.result_port.poll_actions(cx, link); + if self.result_port.poll_actions(cx, link).is_err() { + log::error!(target: "sync", "poll_actions: Background import task is no longer alive"); + } } } diff --git a/primitives/consensus/common/src/import_queue/buffered_link.rs b/primitives/consensus/common/src/import_queue/buffered_link.rs index d85121a710ecb..a37d4c53c2603 100644 --- a/primitives/consensus/common/src/import_queue/buffered_link.rs +++ b/primitives/consensus/common/src/import_queue/buffered_link.rs @@ -50,7 +50,7 @@ use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError}; pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); let tx = BufferedLinkSender { tx }; - let rx = BufferedLinkReceiver { rx }; + let rx = BufferedLinkReceiver { rx: rx.fuse() }; (tx, rx) } @@ -127,7 +127,7 @@ impl Link for BufferedLinkSender { /// See [`buffered_link`]. pub struct BufferedLinkReceiver { - rx: TracingUnboundedReceiver>, + rx: stream::Fuse>>, } impl BufferedLinkReceiver { @@ -137,12 +137,14 @@ impl BufferedLinkReceiver { /// This method should behave in a way similar to `Future::poll`. It can register the current /// task and notify later when more actions are ready to be polled. To continue the comparison, /// it is as if this method always returned `Poll::Pending`. - pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { + /// + /// Returns an error if the corresponding [`BufferedLinkSender`] has been closed. + pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) -> Result<(), ()> { loop { - let msg = if let Poll::Ready(Some(msg)) = Stream::poll_next(Pin::new(&mut self.rx), cx) { - msg - } else { - break + let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => break Err(()), + Poll::Pending => break Ok(()), }; match msg { @@ -162,7 +164,7 @@ impl BufferedLinkReceiver { /// Close the channel. pub fn close(&mut self) { - self.rx.close() + self.rx.get_mut().close() } }