Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

cleanup stream polls #3397

Merged
merged 3 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ProvisioningJob {
};
loop {
futures::select! {
msg = self.receiver.next().fuse() => match msg {
msg = self.receiver.next() => match msg {
Some(RequestInherentData(_, return_sender)) => {
let _span = span.child("req-inherent-data");
let _timer = self.metrics.time_request_inherent_data();
Expand Down
31 changes: 0 additions & 31 deletions node/metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,34 +153,3 @@ impl<T> MeteredSender<T> {
})
}
}

impl<T> futures::sink::Sink<T> for MeteredSender<T> {
type Error = mpsc::SendError;

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_close(cx) {
val @ Poll::Ready(_)=> {
val
}
other => other,
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_flush(cx) {
val @ Poll::Ready(_)=> {
self.meter.note_sent();
val
}
other => other,
}
}
}
2 changes: 1 addition & 1 deletion node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
}
}
},
req_res_event = request_multiplexer.next().fuse() => match req_res_event {
req_res_event = request_multiplexer.next() => match req_res_event {
None => return Err(UnexpectedAbort::RequestStreamConcluded),
Some(Err(err)) => {
network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST);
Expand Down
8 changes: 4 additions & 4 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,14 @@ impl Message {
// We are only fusing here to make `select` happy, in reality we will quit if one of those
// streams end:
let from_overseer = ctx.recv().fuse();
let from_requester = from_requester.next().fuse();
let from_responder = from_responder.next().fuse();
let from_requester = from_requester.next();
let from_responder = from_responder.next();
futures::pin_mut!(from_overseer, from_requester, from_responder);
futures::select!(
futures::select! {
msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
msg = from_requester => Message::Requester(msg),
msg = from_responder => Message::Responder(msg),
)
}
}
}

Expand Down
39 changes: 11 additions & 28 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,8 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}
}

let mut await_message = self.messages.next().fuse();
let mut await_signal = self.signals.next().fuse();
let mut await_message = self.messages.next();
let mut await_signal = self.signals.next();
let signals_received = self.signals_received.load();
let pending_incoming = &mut self.pending_incoming;

Expand Down Expand Up @@ -1901,13 +1901,7 @@ where

loop {
select! {
msg = self.events_rx.next().fuse() => {
let msg = if let Some(msg) = msg {
msg
} else {
continue
};

msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg.into()).await?;
Expand All @@ -1927,16 +1921,7 @@ where
}
}
},
msg = self.to_overseer_rx.next() => {
let msg = match msg {
Some(m) => m,
None => {
// This is a fused stream so we will shut down after receiving all
// shutdown notifications.
continue
}
};

msg = self.to_overseer_rx.select_next_some() => {
match msg {
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
Expand All @@ -1946,16 +1931,14 @@ where
}
}
},
res = self.running_subsystems.next().fuse() => {
let finished = if let Some(finished) = res {
finished
} else {
continue
};

tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
res = self.running_subsystems.select_next_some() => {
tracing::error!(
target: LOG_TARGET,
subsystem = ?res,
"subsystem finished unexpectedly",
);
self.stop().await;
return finished;
return res;
},
}
}
Expand Down