Skip to content

Commit

Permalink
ping: update to libp2p-swarm ListenUpgradeError
Browse files Browse the repository at this point in the history
and DialUpgradeError updates.

ping/handler: Move dial upgrade error logic from poll to
on_dial_upgrade_error and rename pending_errors to pending_events, so
that handler flow becomes more similar to other protocols.
  • Loading branch information
jxs committed Jan 6, 2023
1 parent 506e87e commit 4d0b1f5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 48 deletions.
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# 0.44.0 [unreleased]

- Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` introduction and consequential changes. See [PR XXXX].

- Update to `libp2p-core` `v0.39.0`.

- Update to `libp2p-swarm` `v0.42.0`.

[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX

# 0.43.0

- Update to `libp2p-core` `v0.38.0`.
Expand Down
110 changes: 62 additions & 48 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ use futures::StreamExt;
use instant::Instant;
use libp2p_core::upgrade::{NegotiationError, UpgradeError};
use libp2p_swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
SubstreamProtocol,
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use log::{error, trace, warn};
Expand Down Expand Up @@ -123,8 +122,15 @@ pub struct GossipsubHandler {
/// The amount of time we allow idle connections before disconnecting.
idle_timeout: Duration,

/// Collection of errors from attempting an upgrade.
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<GossipsubHandlerError>>,
/// Collection of pendings events to be returned.
pending_events: VecDeque<
ConnectionHandlerEvent<
<GossipsubHandler as ConnectionHandler>::OutboundProtocol,
<GossipsubHandler as ConnectionHandler>::OutboundOpenInfo,
<GossipsubHandler as ConnectionHandler>::OutEvent,
<GossipsubHandler as ConnectionHandler>::Error,
>,
>,

/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,
Expand Down Expand Up @@ -176,7 +182,7 @@ impl GossipsubHandler {
peer_kind_sent: false,
protocol_unsupported: false,
idle_timeout,
upgrade_errors: VecDeque::new(),
pending_events: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
in_mesh: false,
}
Expand Down Expand Up @@ -243,6 +249,45 @@ impl GossipsubHandler {
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message));
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { error, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
self.outbound_substream_establishing = false;
warn!("Dial upgrade error {:?}", error);
let reported_error = match error {
// There was an error post negotiation, close the connection.
UpgradeError::Apply(e) => ConnectionHandlerEvent::Close(e),
UpgradeError::Select(negotiation_error) => {
match negotiation_error {
NegotiationError::Failed => {
// The protocol is not supported
self.protocol_unsupported = true;
if !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
PeerKind::NotSupported,
))
} else {
return;
}
}
NegotiationError::ProtocolError(e) => ConnectionHandlerEvent::Close(
GossipsubHandlerError::NegotiationProtocolError(e),
),
}
}
};
self.pending_events.push_back(reported_error);
}
}

impl ConnectionHandler for GossipsubHandler {
Expand Down Expand Up @@ -291,44 +336,9 @@ impl ConnectionHandler for GossipsubHandler {
Self::Error,
>,
> {
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
ConnectionHandlerUpgrErr::Timeout => {
Some(GossipsubHandlerError::NegotiationTimeout)
}
// There was an error post negotiation, close the connection.
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
match negotiation_error {
NegotiationError::Failed => {
// The protocol is not supported
self.protocol_unsupported = true;
if !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ConnectionHandlerEvent::Custom(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
} else {
None
}
}
NegotiationError::ProtocolError(e) => {
Some(GossipsubHandlerError::NegotiationProtocolError(e))
}
}
}
};

// If there was a fatal error, close the connection.
if let Some(error) = reported_error {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}
// Handle pending events.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}

if !self.peer_kind_sent {
Expand Down Expand Up @@ -573,10 +583,14 @@ impl ConnectionHandler for GossipsubHandler {
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => {
self.outbound_substream_establishing = false;
warn!("Dial upgrade error {:?}", e);
self.upgrade_errors.push_back(e);
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::DialTimeout(_) => {
warn!("Dial upgrade timeout expired");
self.pending_events.push_back(ConnectionHandlerEvent::Close(
GossipsubHandlerError::NegotiationTimeout,
));
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
}
Expand Down

0 comments on commit 4d0b1f5

Please sign in to comment.