diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 349defd9c20b..5d38d4c75bff 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,9 +1,13 @@ # 0.44.0 [unreleased] +- Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` introduction and consequential changes. See [PR XXXX]. + - Update to `libp2p-core` `v0.39.0`. - Update to `libp2p-swarm` `v0.42.0`. +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX + # 0.43.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index dc695bf7df87..07a1bdaeb1a5 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,9 +27,8 @@ use futures::StreamExt; use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; use log::{error, trace, warn}; @@ -123,8 +122,15 @@ pub struct GossipsubHandler { /// The amount of time we allow idle connections before disconnecting. idle_timeout: Duration, - /// Collection of errors from attempting an upgrade. - upgrade_errors: VecDeque>, + /// Collection of pendings events to be returned. + pending_events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, + >, + >, /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, @@ -176,7 +182,7 @@ impl GossipsubHandler { peer_kind_sent: false, protocol_unsupported: false, idle_timeout, - upgrade_errors: VecDeque::new(), + pending_events: VecDeque::new(), keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), in_mesh: false, } @@ -243,6 +249,45 @@ impl GossipsubHandler { self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); } } + + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { error, .. }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { + self.outbound_substream_establishing = false; + warn!("Dial upgrade error {:?}", error); + let reported_error = match error { + // There was an error post negotiation, close the connection. + UpgradeError::Apply(e) => ConnectionHandlerEvent::Close(e), + UpgradeError::Select(negotiation_error) => { + match negotiation_error { + NegotiationError::Failed => { + // The protocol is not supported + self.protocol_unsupported = true; + if !self.peer_kind_sent { + self.peer_kind_sent = true; + // clear all substreams so the keep alive returns false + self.inbound_substream = None; + self.outbound_substream = None; + self.keep_alive = KeepAlive::No; + ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( + PeerKind::NotSupported, + )) + } else { + return; + } + } + NegotiationError::ProtocolError(e) => ConnectionHandlerEvent::Close( + GossipsubHandlerError::NegotiationProtocolError(e), + ), + } + } + }; + self.pending_events.push_back(reported_error); + } } impl ConnectionHandler for GossipsubHandler { @@ -291,44 +336,9 @@ impl ConnectionHandler for GossipsubHandler { Self::Error, >, > { - // Handle any upgrade errors - if let Some(error) = self.upgrade_errors.pop_front() { - let reported_error = match error { - // Timeout errors get mapped to NegotiationTimeout and we close the connection. - ConnectionHandlerUpgrErr::Timeout => { - Some(GossipsubHandlerError::NegotiationTimeout) - } - // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - if !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom( - HandlerEvent::PeerKind(PeerKind::NotSupported), - )); - } else { - None - } - } - NegotiationError::ProtocolError(e) => { - Some(GossipsubHandlerError::NegotiationProtocolError(e)) - } - } - } - }; - - // If there was a fatal error, close the connection. - if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); - } + // Handle pending events. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); } if !self.peer_kind_sent { @@ -573,10 +583,14 @@ impl ConnectionHandler for GossipsubHandler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { - self.outbound_substream_establishing = false; - warn!("Dial upgrade error {:?}", e); - self.upgrade_errors.push_back(e); + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::DialTimeout(_) => { + warn!("Dial upgrade timeout expired"); + self.pending_events.push_back(ConnectionHandlerEvent::Close( + GossipsubHandlerError::NegotiationTimeout, + )); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} }