From 78ebde3befcb408c18a42aba9009b2b21a09c02a Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 1 Jul 2021 21:24:35 +0200 Subject: [PATCH 1/3] metered-channel: remove dead code --- node/metered-channel/src/bounded.rs | 31 ----------------------------- 1 file changed, 31 deletions(-) diff --git a/node/metered-channel/src/bounded.rs b/node/metered-channel/src/bounded.rs index 38aa6f15c65f..43a77f707fc0 100644 --- a/node/metered-channel/src/bounded.rs +++ b/node/metered-channel/src/bounded.rs @@ -153,34 +153,3 @@ impl MeteredSender { }) } } - -impl futures::sink::Sink for MeteredSender { - type Error = mpsc::SendError; - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut self.inner).start_send(item) - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_ready(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_close(cx) { - val @ Poll::Ready(_)=> { - val - } - other => other, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_flush(cx) { - val @ Poll::Ready(_)=> { - self.meter.note_sent(); - val - } - other => other, - } - } -} From 1cb0238a44970689fcf10337d93565932a2d5efd Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 1 Jul 2021 21:31:23 +0200 Subject: [PATCH 2/3] we don't need no fuse --- node/overseer/src/lib.rs | 39 +++++++++++---------------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 7e6f831dd70a..06a720e85fdc 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -909,8 +909,8 @@ impl SubsystemContext for OverseerSubsystemContext { } } - let mut await_message = self.messages.next().fuse(); - let mut await_signal = self.signals.next().fuse(); + let mut await_message = self.messages.next(); + let mut await_signal = self.signals.next(); let signals_received = self.signals_received.load(); let pending_incoming = &mut self.pending_incoming; @@ -1901,13 +1901,7 @@ where loop { select! { - msg = self.events_rx.next().fuse() => { - let msg = if let Some(msg) = msg { - msg - } else { - continue - }; - + msg = self.events_rx.select_next_some() => { match msg { Event::MsgToSubsystem(msg) => { self.route_message(msg.into()).await?; @@ -1927,16 +1921,7 @@ where } } }, - msg = self.to_overseer_rx.next() => { - let msg = match msg { - Some(m) => m, - None => { - // This is a fused stream so we will shut down after receiving all - // shutdown notifications. - continue - } - }; - + msg = self.to_overseer_rx.select_next_some() => { match msg { ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); @@ -1946,16 +1931,14 @@ where } } }, - res = self.running_subsystems.next().fuse() => { - let finished = if let Some(finished) = res { - finished - } else { - continue - }; - - tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); + res = self.running_subsystems.select_next_some() => { + tracing::error!( + target: LOG_TARGET, + subsystem = ?res, + "subsystem finished unexpectedly", + ); self.stop().await; - return finished; + return res; }, } } From e61b9bdfeac4c1854b0b16f12c5d86073cd37b4a Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 1 Jul 2021 21:35:12 +0200 Subject: [PATCH 3/3] even more --- node/core/provisioner/src/lib.rs | 2 +- node/network/bridge/src/lib.rs | 2 +- node/network/statement-distribution/src/lib.rs | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index 7a730aa8cfdf..85f10c68a74c 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -191,7 +191,7 @@ impl ProvisioningJob { }; loop { futures::select! { - msg = self.receiver.next().fuse() => match msg { + msg = self.receiver.next() => match msg { Some(RequestInherentData(_, return_sender)) => { let _span = span.child("req-inherent-data"); let _timer = self.metrics.time_request_inherent_data(); diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 399b6e6c26d0..87c6fd9494b0 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -830,7 +830,7 @@ async fn handle_network_messages( } } }, - req_res_event = request_multiplexer.next().fuse() => match req_res_event { + req_res_event = request_multiplexer.next() => match req_res_event { None => return Err(UnexpectedAbort::RequestStreamConcluded), Some(Err(err)) => { network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST); diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index b0b2d8d8f859..59c9cf249ba0 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -595,14 +595,14 @@ impl Message { // We are only fusing here to make `select` happy, in reality we will quit if one of those // streams end: let from_overseer = ctx.recv().fuse(); - let from_requester = from_requester.next().fuse(); - let from_responder = from_responder.next().fuse(); + let from_requester = from_requester.next(); + let from_responder = from_responder.next(); futures::pin_mut!(from_overseer, from_requester, from_responder); - futures::select!( + futures::select! { msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)), msg = from_requester => Message::Requester(msg), msg = from_responder => Message::Responder(msg), - ) + } } }