Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fuse the import queue receiver #6876

Merged
2 commits merged into from
Aug 12, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions primitives/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::import_queue::{Origin, Link, BlockImportResult, BlockImportError};
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
}

Expand Down Expand Up @@ -127,7 +127,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {

/// See [`buffered_link`].
pub struct BufferedLinkReceiver<B: BlockT> {
rx: TracingUnboundedReceiver<BlockImportWorkerMsg<B>>,
rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
}

impl<B: BlockT> BufferedLinkReceiver<B> {
Expand Down Expand Up @@ -162,7 +162,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {

/// Close the channel.
pub fn close(&mut self) {
self.rx.close()
self.rx.get_mut().close()
}
}

Expand Down