Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
cleanup stream polls (#3397)
Browse files Browse the repository at this point in the history
* metered-channel: remove dead code

* we don't need no fuse

* even more
  • Loading branch information
ordian authored Jul 2, 2021
1 parent 677d4b1 commit 91912fd
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 65 deletions.
2 changes: 1 addition & 1 deletion node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ProvisioningJob {
};
loop {
futures::select! {
msg = self.receiver.next().fuse() => match msg {
msg = self.receiver.next() => match msg {
Some(RequestInherentData(_, return_sender)) => {
let _span = span.child("req-inherent-data");
let _timer = self.metrics.time_request_inherent_data();
Expand Down
31 changes: 0 additions & 31 deletions node/metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,34 +153,3 @@ impl<T> MeteredSender<T> {
})
}
}

impl<T> futures::sink::Sink<T> for MeteredSender<T> {
type Error = mpsc::SendError;

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_close(cx) {
val @ Poll::Ready(_)=> {
val
}
other => other,
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_flush(cx) {
val @ Poll::Ready(_)=> {
self.meter.note_sent();
val
}
other => other,
}
}
}
2 changes: 1 addition & 1 deletion node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
}
}
},
req_res_event = request_multiplexer.next().fuse() => match req_res_event {
req_res_event = request_multiplexer.next() => match req_res_event {
None => return Err(UnexpectedAbort::RequestStreamConcluded),
Some(Err(err)) => {
network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST);
Expand Down
8 changes: 4 additions & 4 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,14 @@ impl Message {
// We are only fusing here to make `select` happy, in reality we will quit if one of those
// streams end:
let from_overseer = ctx.recv().fuse();
let from_requester = from_requester.next().fuse();
let from_responder = from_responder.next().fuse();
let from_requester = from_requester.next();
let from_responder = from_responder.next();
futures::pin_mut!(from_overseer, from_requester, from_responder);
futures::select!(
futures::select! {
msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
msg = from_requester => Message::Requester(msg),
msg = from_responder => Message::Responder(msg),
)
}
}
}

Expand Down
39 changes: 11 additions & 28 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,8 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}
}

let mut await_message = self.messages.next().fuse();
let mut await_signal = self.signals.next().fuse();
let mut await_message = self.messages.next();
let mut await_signal = self.signals.next();
let signals_received = self.signals_received.load();
let pending_incoming = &mut self.pending_incoming;

Expand Down Expand Up @@ -1901,13 +1901,7 @@ where

loop {
select! {
msg = self.events_rx.next().fuse() => {
let msg = if let Some(msg) = msg {
msg
} else {
continue
};

msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg.into()).await?;
Expand All @@ -1927,16 +1921,7 @@ where
}
}
},
msg = self.to_overseer_rx.next() => {
let msg = match msg {
Some(m) => m,
None => {
// This is a fused stream so we will shut down after receiving all
// shutdown notifications.
continue
}
};

msg = self.to_overseer_rx.select_next_some() => {
match msg {
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
Expand All @@ -1946,16 +1931,14 @@ where
}
}
},
res = self.running_subsystems.next().fuse() => {
let finished = if let Some(finished) = res {
finished
} else {
continue
};

tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
res = self.running_subsystems.select_next_some() => {
tracing::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return finished;
return res;
},
}
}
Expand Down

0 comments on commit 91912fd

Please sign in to comment.