Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fuse the import queue receiver (#6876)
Browse files Browse the repository at this point in the history
* Fix the import queue receiver

* Add logging
  • Loading branch information
tomaka authored Aug 12, 2020
1 parent a20fbd5 commit d4efdf0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
28 changes: 24 additions & 4 deletions primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}

trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let _ = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
if res.is_err() {
log::error!(
target: "sync",
"import_blocks: Background import task is no longer alive"
);
}
}

fn import_justification(
Expand All @@ -106,10 +112,16 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
number: NumberFor<B>,
justification: Justification
) {
let _ = self.sender
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportJustification(who, hash, number, justification)
);
if res.is_err() {
log::error!(
target: "sync",
"import_justification: Background import task is no longer alive"
);
}
}

fn import_finality_proof(
Expand All @@ -120,14 +132,22 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
finality_proof: Vec<u8>,
) {
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let _ = self.sender
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)
);
if res.is_err() {
log::error!(
target: "sync",
"import_finality_proof: Background import task is no longer alive"
);
}
}

fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
self.result_port.poll_actions(cx, link);
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
}
}
}

Expand Down
18 changes: 10 additions & 8 deletions primitives/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
}

Expand Down Expand Up @@ -127,7 +127,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {

/// See [`buffered_link`].
pub struct BufferedLinkReceiver<B: BlockT> {
rx: TracingUnboundedReceiver<BlockImportWorkerMsg<B>>,
rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
}

impl<B: BlockT> BufferedLinkReceiver<B> {
Expand All @@ -137,12 +137,14 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
///
/// Returns an error if the corresponding [`BufferedLinkSender`] has been closed.
pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) -> Result<(), ()> {
loop {
let msg = if let Poll::Ready(Some(msg)) = Stream::poll_next(Pin::new(&mut self.rx), cx) {
msg
} else {
break
let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => break Err(()),
Poll::Pending => break Ok(()),
};

match msg {
Expand All @@ -162,7 +164,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {

/// Close the channel.
pub fn close(&mut self) {
self.rx.close()
self.rx.get_mut().close()
}
}

Expand Down

0 comments on commit d4efdf0

Please sign in to comment.