Skip to content

Commit

Permalink
Gossipsub: remove ConnectionHandlerEvent::Close
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Ermolaev committed Mar 17, 2023
1 parent eb5e269 commit 1264345
Showing 1 changed file with 24 additions and 31 deletions.
55 changes: 24 additions & 31 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use log::{error, trace, warn};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -291,13 +290,17 @@ impl ConnectionHandler for Handler {
> {
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
Some(HandlerError::NegotiationTimeout)
self.keep_alive = KeepAlive::No;
log::info!("Gossipsub error: {}", HandlerError::NegotiationTimeout);
}
// There was an error post negotiation, close the connection.
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
self.keep_alive = KeepAlive::No;
log::info!("Gossipsub error: {e}");
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
match negotiation_error {
NegotiationError::Failed => {
Expand All @@ -312,20 +315,17 @@ impl ConnectionHandler for Handler {
return Poll::Ready(ConnectionHandlerEvent::Custom(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
} else {
None
}
}
NegotiationError::ProtocolError(e) => {
Some(HandlerError::NegotiationProtocolError(e))
self.keep_alive = KeepAlive::No;
log::info!(
"Gossipsub error: {}",
HandlerError::NegotiationProtocolError(e)
);
}
}
}
};

// If there was a fatal error, close the connection.
if let Some(error) = reported_error {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}
}

Expand All @@ -340,9 +340,8 @@ impl ConnectionHandler for Handler {

if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxInboundSubstreams,
));
self.keep_alive = KeepAlive::No;
log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams);
}

// determine if we need to create the stream
Expand All @@ -351,9 +350,8 @@ impl ConnectionHandler for Handler {
&& !self.outbound_substream_establishing
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxOutboundSubstreams,
));
self.keep_alive = KeepAlive::No;
log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams);
}
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
Expand Down Expand Up @@ -475,14 +473,14 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::WaitingOutput(substream));
}
Err(e) => {
error!("Error sending message: {}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
error!("Error sending message: {e}");
break;
}
}
}
Poll::Ready(Err(e)) => {
error!("Outbound substream error while sending output: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
error!("Outbound substream error while sending output: {e:?}");
break;
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -504,7 +502,8 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(e))
log::info!("NegotaitedSubstream error: {e}");
break;
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -525,14 +524,8 @@ impl ConnectionHandler for Handler {
break;
}
Poll::Ready(Err(e)) => {
warn!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)
.into(),
));
warn!("Outbound substream error while closing: {e:?}");
break;
}
Poll::Pending => {
self.keep_alive = KeepAlive::No;
Expand Down

0 comments on commit 1264345

Please sign in to comment.