diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index b38f456972b..36b930fd0b4 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -216,7 +216,8 @@ mod network { use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel}; - use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; + use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; + use libp2p_core::UpgradeError; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -403,10 +404,7 @@ mod network { async fn handle_event( &mut self, - event: SwarmEvent< - ComposedEvent, - EitherError, io::Error>, - >, + event: SwarmEvent, io::Error>>, ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 66fe489060b..e82b3d5ea47 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -6,13 +6,13 @@ - Bump MSRV to 1.65.0. -- Update to `libp2p-dcutr` `v0.9.0`. +- Update to `libp2p-dcutr` `v0.9.0` and introduce `dcutr::EventType::DirectConnectionUpgradeTimedOut`. - Update to `libp2p-ping` `v0.42.0`. - Update to `libp2p-kad` `v0.43.0`. -- Update to `libp2p-relay` `v0.15.0`. +- Update to `libp2p-relay` `v0.15.0` and introduce `relay::EventType::CircuitReqOutboundConectTimedOut`. - Update to `libp2p-identify` `v0.42.0`. diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 7b73ac85eba..cb174278cb9 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -2,7 +2,8 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` introduction and consequential changes. + With that introduce `Event::DirectConnectionUpgradeTimedout`. See [PR 3307]. - Declare `InboundUpgradeError` and `OutboundUpgradeError` as type aliases instead of renames. This is a workaround for a missing feature in `cargo semver-checks`. See [PR 3213]. @@ -15,6 +16,7 @@ [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 [issue 2217]: https://github.com/libp2p/rust-libp2p/issues/2217 [PR 3214]: https://github.com/libp2p/rust-libp2p/pull/3214 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.8.0 diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index 2d65197c5f8..14c219da30a 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -25,12 +25,13 @@ use crate::protocol; use either::Either; use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::multiaddr::Protocol; +use libp2p_core::UpgradeError; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerUpgrErr, ExternalAddresses, IntoConnectionHandler, - NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ConnectionHandler, ExternalAddresses, IntoConnectionHandler, NetworkBehaviour, + NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; @@ -63,7 +64,9 @@ pub enum Error { #[error("Failed to dial peer.")] Dial, #[error("Failed to establish substream: {0}.")] - Handler(ConnectionHandlerUpgrErr), + Handler(UpgradeError), + #[error("Timeout expired upgrading the direct connection.")] + Timeout, } pub struct Behaviour { @@ -241,6 +244,15 @@ impl NetworkBehaviour for Behaviour { .into(), ); } + Either::Left(handler::relayed::Event::InboundNegotiationTimedout) => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: Error::Timeout, + }) + .into(), + ); + } Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { self.queued_actions.push_back( NetworkBehaviourAction::Dial { diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index 9e6759977ad..4f042d48b63 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -22,11 +22,9 @@ use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_core::UpgradeError; use libp2p_swarm::handler::ConnectionEvent; -use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, -}; +use libp2p_swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; use std::task::{Context, Poll}; use void::Void; @@ -52,7 +50,7 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = void::Void; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; + type Error = UpgradeError; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; @@ -103,6 +101,7 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::FullyNegotiatedOutbound(_) | ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::DialTimeout(_) | ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::AddressChange(_) => {} } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 301f2ee3d82..b80dca468e7 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -31,10 +31,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; -use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - SubstreamProtocol, -}; +use libp2p_swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; use std::collections::VecDeque; use std::fmt; use std::task::{Context, Poll}; @@ -83,11 +80,12 @@ pub enum Event { remote_addr: Multiaddr, }, InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, + InboundNegotiationTimedout, InboundConnectNegotiated(Vec), OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, OutboundConnectNegotiated { remote_addrs: Vec, @@ -109,6 +107,9 @@ impl fmt::Debug for Event { .debug_struct("Event::InboundNegotiationFailed") .field("error", error) .finish(), + Event::InboundNegotiationTimedout => { + f.debug_struct("Event::InboundNegotiationTimedout").finish() + } Event::InboundConnectNegotiated(addrs) => f .debug_tuple("Event::InboundConnectNegotiated") .field(addrs) @@ -133,7 +134,7 @@ pub struct Handler { endpoint: ConnectedPoint, /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< + UpgradeError< EitherError, >, >, @@ -220,41 +221,23 @@ impl Handler { >, ) { match error { - ConnectionHandlerUpgrErr::Timeout => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Timeout, - }, - )); - } - ConnectionHandlerUpgrErr::Timer => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Timer, - }, - )); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. self.keep_alive = KeepAlive::No; self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: UpgradeError::Select(NegotiationError::Failed), }, )); } _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| { - e.map_err(|e| match e { - EitherError::A(e) => EitherError::A(e), - EitherError::B(v) => void::unreachable(v), - }) + self.pending_error = Some(error.map_err(|e| match e { + EitherError::A(e) => EitherError::A(e), + EitherError::B(v) => void::unreachable(v), })); } } @@ -270,29 +253,20 @@ impl Handler { self.keep_alive = KeepAlive::No; match error { - ConnectionHandlerUpgrErr::Timeout => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Timeout, - }, - )); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: UpgradeError::Select(NegotiationError::Failed), }, )); } _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B))); + self.pending_error = Some(error.map_err(EitherError::B)); } } } @@ -301,7 +275,7 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Command; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< + type Error = UpgradeError< EitherError, >; type InboundProtocol = upgrade::EitherUpgrade; @@ -392,9 +366,9 @@ impl ConnectionHandler for Handler { )); } Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))), - )) + return Poll::Ready(ConnectionHandlerEvent::Close(UpgradeError::Apply( + EitherError::A(e), + ))) } } } @@ -424,6 +398,11 @@ impl ConnectionHandler for Handler { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } + ConnectionEvent::DialTimeout(_) => { + self.queued_events.push_back(ConnectionHandlerEvent::Custom( + Event::InboundNegotiationTimedout, + )); + } ConnectionEvent::AddressChange(_) => {} } } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 349defd9c20..8972639d76f 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,7 +2,10 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. See [PR 3307]. + +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.43.0 diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 8fd563c37a6..07a1bdaeb1a 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,9 +27,8 @@ use futures::StreamExt; use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; use log::{error, trace, warn}; @@ -123,8 +122,15 @@ pub struct GossipsubHandler { /// The amount of time we allow idle connections before disconnecting. idle_timeout: Duration, - /// Collection of errors from attempting an upgrade. - upgrade_errors: VecDeque>, + /// Collection of pendings events to be returned. + pending_events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, + >, + >, /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, @@ -176,7 +182,7 @@ impl GossipsubHandler { peer_kind_sent: false, protocol_unsupported: false, idle_timeout, - upgrade_errors: VecDeque::new(), + pending_events: VecDeque::new(), keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), in_mesh: false, } @@ -243,6 +249,45 @@ impl GossipsubHandler { self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); } } + + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { error, .. }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { + self.outbound_substream_establishing = false; + warn!("Dial upgrade error {:?}", error); + let reported_error = match error { + // There was an error post negotiation, close the connection. + UpgradeError::Apply(e) => ConnectionHandlerEvent::Close(e), + UpgradeError::Select(negotiation_error) => { + match negotiation_error { + NegotiationError::Failed => { + // The protocol is not supported + self.protocol_unsupported = true; + if !self.peer_kind_sent { + self.peer_kind_sent = true; + // clear all substreams so the keep alive returns false + self.inbound_substream = None; + self.outbound_substream = None; + self.keep_alive = KeepAlive::No; + ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( + PeerKind::NotSupported, + )) + } else { + return; + } + } + NegotiationError::ProtocolError(e) => ConnectionHandlerEvent::Close( + GossipsubHandlerError::NegotiationProtocolError(e), + ), + } + } + }; + self.pending_events.push_back(reported_error); + } } impl ConnectionHandler for GossipsubHandler { @@ -291,44 +336,9 @@ impl ConnectionHandler for GossipsubHandler { Self::Error, >, > { - // Handle any upgrade errors - if let Some(error) = self.upgrade_errors.pop_front() { - let reported_error = match error { - // Timeout errors get mapped to NegotiationTimeout and we close the connection. - ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - Some(GossipsubHandlerError::NegotiationTimeout) - } - // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - if !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom( - HandlerEvent::PeerKind(PeerKind::NotSupported), - )); - } else { - None - } - } - NegotiationError::ProtocolError(e) => { - Some(GossipsubHandlerError::NegotiationProtocolError(e)) - } - } - } - }; - - // If there was a fatal error, close the connection. - if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); - } + // Handle pending events. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); } if !self.peer_kind_sent { @@ -573,10 +583,14 @@ impl ConnectionHandler for GossipsubHandler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { - self.outbound_substream_establishing = false; - warn!("Dial upgrade error {:?}", e); - self.upgrade_errors.push_back(e); + ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + ConnectionEvent::DialTimeout(_) => { + warn!("Dial upgrade timeout expired"); + self.pending_events.push_back(ConnectionHandlerEvent::Close( + GossipsubHandlerError::NegotiationTimeout, + )); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index e4ce7498f32..74bc8097545 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -5,9 +5,12 @@ - Move I/O from `Behaviour` to `Handler`. Handle `Behaviour`'s Identify and Push requests independently by incoming order, previously Push requests were prioritized. see [PR 3208]. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. With that introduce `Error`. See [PR 3307]. + [PR 3208]: https://github.com/libp2p/rust-libp2p/pull/3208 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.41.1 diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index beb264da789..77cf787656f 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -25,9 +25,9 @@ use libp2p_core::{ }; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ - dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, + dial_opts::DialOpts, AddressScore, ConnectionHandler, DialError, ExternalAddresses, + IntoConnectionHandler, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -38,6 +38,7 @@ use std::{ task::Poll, time::Duration, }; +use thiserror::Error; /// Network behaviour that automatically identifies nodes periodically, returns information /// about them, and answers identify queries from other nodes. @@ -300,9 +301,16 @@ impl NetworkBehaviour for Behaviour { self.events .push_back(NetworkBehaviourAction::GenerateEvent(Event::Error { peer_id, - error, + error: Error::Upgrade(error), })); } + handler::Event::IdentificationTimedout => { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Error { + peer_id, + error: Error::Timeout, + })) + } } } @@ -459,10 +467,20 @@ pub enum Event { /// The peer with whom the error originated. peer_id: PeerId, /// The error that occurred. - error: ConnectionHandlerUpgrErr, + error: Error, }, } +/// Errors emitted by the `Event::Error`. +#[derive(Debug, Error)] +pub enum Error { + /// Error while attempting to identify the remote. + #[error("Failed to identify the remote: {0}.")] + Upgrade(libp2p_core::UpgradeError), + #[error("Timeout expired while attempting to identify the remote.")] + Timeout, +} + fn supported_protocols(params: &impl PollParameters) -> Vec { // The protocol names can be bytes, but the identify protocol except UTF-8 strings. // There's not much we can do to solve this conflict except strip non-UTF-8 characters. diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 21063acc661..16898d092e8 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -32,8 +32,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::warn; use smallvec::SmallVec; @@ -161,7 +161,9 @@ pub enum Event { /// We received a request for identification. Identify, /// Failed to identify the remote, or to reply to an identification request. - IdentificationError(ConnectionHandlerUpgrErr), + IdentificationError(libp2p_core::UpgradeError), + /// Identification Dial timedout. + IdentificationTimedout, } impl Handler { @@ -257,11 +259,11 @@ impl Handler { ) { use libp2p_core::upgrade::UpgradeError; - let err = err.map_upgrade_err(|e| match e { + let err = match err { UpgradeError::Select(e) => UpgradeError::Select(e), UpgradeError::Apply(EitherError::A(ioe)) => UpgradeError::Apply(ioe), UpgradeError::Apply(EitherError::B(ioe)) => UpgradeError::Apply(ioe), - }); + }; self.events .push(ConnectionHandlerEvent::Custom(Event::IdentificationError( err, @@ -370,9 +372,7 @@ impl ConnectionHandler for Handler { Event::Identification(peer_id), )), Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom( - Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - )), + Event::IdentificationError(libp2p_core::upgrade::UpgradeError::Apply(err)), )), Poll::Ready(None) | Poll::Pending => Poll::Pending, } @@ -397,6 +397,13 @@ impl ConnectionHandler for Handler { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } + ConnectionEvent::DialTimeout(_) => { + self.events.push(ConnectionHandlerEvent::Custom( + Event::IdentificationTimedout, + )); + self.keep_alive = KeepAlive::No; + self.trigger_next_identify.reset(self.interval); + } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index ce49b612c9b..0e98a18a9ad 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -2,13 +2,15 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. With that introduce `KademliaHandlerQueryErr::Timeout`. See [PR 3307]. - Remove lifetime from `RecordStore` and use GATs instead. See [PR 3239]. - Bump MSRV to 1.65.0. [PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.42.1 diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 37f41a8cd8a..79f58bdeaec 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -26,13 +26,14 @@ use crate::record::{self, Record}; use futures::prelude::*; use futures::stream::SelectAll; use instant::Instant; +use libp2p_core::UpgradeError; use libp2p_core::{either::EitherOutput, upgrade, ConnectedPoint, PeerId}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ConnectionEvent, DialTimeout, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::trace; use std::task::Waker; @@ -351,7 +352,9 @@ pub enum KademliaHandlerEvent { #[derive(Debug)] pub enum KademliaHandlerQueryErr { /// Error while trying to perform the query. - Upgrade(ConnectionHandlerUpgrErr), + Upgrade(UpgradeError), + /// Timeout while trying to perform the query. + Timeout, /// Received an answer that doesn't correspond to the request. UnexpectedMessage, /// I/O error in the substream. @@ -373,6 +376,9 @@ impl fmt::Display for KademliaHandlerQueryErr { KademliaHandlerQueryErr::Io(err) => { write!(f, "I/O error during a Kademlia RPC query: {err}") } + KademliaHandlerQueryErr::Timeout => { + write!(f, "Timeout expired during a Kademlia RPC query") + } } } } @@ -381,18 +387,13 @@ impl error::Error for KademliaHandlerQueryErr { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { KademliaHandlerQueryErr::Upgrade(err) => Some(err), + KademliaHandlerQueryErr::Timeout => None, KademliaHandlerQueryErr::UnexpectedMessage => None, KademliaHandlerQueryErr::Io(err) => Some(err), } } } -impl From> for KademliaHandlerQueryErr { - fn from(err: ConnectionHandlerUpgrErr) -> Self { - KademliaHandlerQueryErr::Upgrade(err) - } -} - /// Event to send to the handler. #[derive(Debug)] pub enum KademliaHandlerIn { @@ -605,25 +606,6 @@ where substream: protocol, }); } - - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { - info: (_, user_data), - error, - .. - }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't - // continue trying - if let Some(user_data) = user_data { - self.outbound_substreams - .push(OutboundSubstreamState::ReportError(error.into(), user_data)); - } - } } impl ConnectionHandler for KademliaHandler @@ -800,8 +782,30 @@ where ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { self.on_fully_negotiated_inbound(fully_negotiated_inbound) } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) + // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't + // continue trying + ConnectionEvent::DialUpgradeError(DialUpgradeError { + info: (_, user_data), + error, + }) => { + if let Some(user_data) = user_data { + self.outbound_substreams + .push(OutboundSubstreamState::ReportError( + KademliaHandlerQueryErr::Upgrade(error), + user_data, + )); + } + } + ConnectionEvent::DialTimeout(DialTimeout { + info: (_, user_data), + }) => { + if let Some(user_data) = user_data { + self.outbound_substreams + .push(OutboundSubstreamState::ReportError( + KademliaHandlerQueryErr::Timeout, + user_data, + )); + } } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 6b2ab7b43f6..3ea1cfbf97e 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -2,7 +2,10 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. See [PR 3307]. + +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.41.0 diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 2703b274c77..214c464cfd6 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -28,8 +28,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; use std::collections::VecDeque; use std::{ @@ -238,14 +237,12 @@ impl Handler { self.outbound = None; // Request a new substream on the next `poll`. let error = match error { - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { debug_assert_eq!(self.state, State::Active); self.state = State::Inactive { reported: false }; return; } - // Note: This timeout only covers protocol negotiation. - ConnectionHandlerUpgrErr::Timeout => Failure::Timeout, e => Failure::Other { error: Box::new(e) }, }; @@ -411,6 +408,10 @@ impl ConnectionHandler for Handler { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } + ConnectionEvent::DialTimeout(_) => { + self.outbound = None; // Request a new substream on the next `poll`. + self.pending_errors.push_front(Failure::Timeout); + } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 082f829657e..5d911ebd6bd 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -9,9 +9,13 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` introduction and consequential changes. + With that introduce `behaviour::OutboundError` and `client::InboundError`. + Remove unneeded `inbound_hop::UpgradeError` as all `inbound_hop` upgrade errors are Fatal. See [PR 3307]. + [PR 3238]: https://github.com/libp2p/rust-libp2p/pull/3238 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 [discussion 2174]: https://github.com/libp2p/rust-libp2p/issues/2174 # 0.14.0 diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 32150f884a0..c2f5e39be68 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -29,17 +29,17 @@ use either::Either; use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::multiaddr::Protocol; -use libp2p_core::PeerId; +use libp2p_core::{PeerId, UpgradeError}; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; use std::ops::Add; use std::task::{Context, Poll}; use std::time::Duration; +use thiserror::Error; /// Configuration for the relay [`Behaviour`]. /// @@ -136,20 +136,20 @@ pub enum Event { /// Accepting an inbound reservation request failed. ReservationReqAcceptFailed { src_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An inbound reservation request has been denied. ReservationReqDenied { src_peer_id: PeerId }, /// Denying an inbound reservation request has failed. ReservationReqDenyFailed { src_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An inbound reservation has timed out. ReservationTimedOut { src_peer_id: PeerId }, CircuitReqReceiveFailed { src_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, /// An inbound circuit request has been denied. CircuitReqDenied { @@ -160,7 +160,7 @@ pub enum Event { CircuitReqDenyFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -171,13 +171,13 @@ pub enum Event { CircuitReqOutboundConnectFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: OutboundError, }, /// Accepting an inbound circuit request failed. CircuitReqAcceptFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An inbound circuit has closed. CircuitClosed { @@ -187,6 +187,14 @@ pub enum Event { }, } +#[derive(Debug, Error)] +pub enum OutboundError { + #[error("An outbound request upgrade failed: {0}.")] + Upgrade(UpgradeError), + #[error("An outbound request timeout expired.")] + Timeout, +} + /// [`NetworkBehaviour`] implementation of the relay server /// functionality of the circuit relay v2 protocol. pub struct Behaviour { @@ -587,7 +595,35 @@ impl NetworkBehaviour for Behaviour { NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { src_peer_id, dst_peer_id: event_source, - error, + error: OutboundError::Upgrade(error), + }) + .into(), + ); + } + handler::Event::OutboundConnectTimedOut { + circuit_id, + src_peer_id, + src_connection_id, + inbound_circuit_req, + status, + } => { + self.queued_actions.push_back( + NetworkBehaviourAction::NotifyHandler { + handler: NotifyHandler::One(src_connection_id), + peer_id: src_peer_id, + event: Either::Left(handler::In::DenyCircuitReq { + circuit_id: Some(circuit_id), + inbound_circuit_req, + status, + }), + } + .into(), + ); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { + src_peer_id, + dst_peer_id: event_source, + error: OutboundError::Timeout, }) .into(), ); diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 120e980c001..9f0f8373fc2 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -32,14 +32,14 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::either::EitherError; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ConnectionEvent, DialTimeout, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, + dummy, ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -155,11 +155,15 @@ pub enum Event { renewed: bool, }, /// Accepting an inbound reservation request failed. - ReservationReqAcceptFailed { error: inbound_hop::UpgradeError }, + ReservationReqAcceptFailed { + error: inbound_hop::FatalUpgradeError, + }, /// An inbound reservation request has been denied. ReservationReqDenied {}, /// Denying an inbound reservation request has failed. - ReservationReqDenyFailed { error: inbound_hop::UpgradeError }, + ReservationReqDenyFailed { + error: inbound_hop::FatalUpgradeError, + }, /// An inbound reservation has timed out. ReservationTimedOut {}, /// An inbound circuit request has been received. @@ -168,9 +172,7 @@ pub enum Event { endpoint: ConnectedPoint, }, /// Receiving an inbound circuit request failed. - CircuitReqReceiveFailed { - error: ConnectionHandlerUpgrErr, - }, + CircuitReqReceiveFailed { error: UpgradeError }, /// An inbound circuit request has been denied. CircuitReqDenied { circuit_id: Option, @@ -180,7 +182,7 @@ pub enum Event { CircuitReqDenyFailed { circuit_id: Option, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -191,7 +193,7 @@ pub enum Event { CircuitReqAcceptFailed { circuit_id: CircuitId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::FatalUpgradeError, }, /// An outbound substream for an inbound circuit request has been /// negotiated. @@ -211,7 +213,15 @@ pub enum Event { src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, status: Status, - error: ConnectionHandlerUpgrErr, + error: UpgradeError, + }, + /// Negotiating an outbound substream for an inbound circuit timeout expired. + OutboundConnectTimedOut { + circuit_id: CircuitId, + src_peer_id: PeerId, + src_connection_id: ConnectionId, + inbound_circuit_req: inbound_hop::CircuitReq, + status: Status, }, /// An inbound circuit has closed. CircuitClosed { @@ -325,6 +335,19 @@ impl fmt::Debug for Event { .field("status", status) .field("error", error) .finish(), + Event::OutboundConnectTimedOut { + circuit_id, + src_peer_id, + src_connection_id, + status, + .. + } => f + .debug_struct("Event::OutboundConnectNegotiationTimedOut") + .field("circuit_id", circuit_id) + .field("src_peer_id", src_peer_id) + .field("src_connection_id", src_connection_id) + .field("status", status) + .finish(), Event::CircuitClosed { circuit_id, dst_peer_id, @@ -396,9 +419,7 @@ pub struct Handler { /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< - EitherError, - >, + UpgradeError>, >, /// Until when to keep the connection alive. @@ -411,12 +432,12 @@ pub struct Handler { /// Futures accepting an inbound circuit request. circuit_accept_futures: - Futures>, + Futures>, /// Futures deying an inbound circuit request. circuit_deny_futures: Futures<( Option, PeerId, - Result<(), inbound_hop::UpgradeError>, + Result<(), inbound_hop::FatalUpgradeError>, )>, /// Tracks substreams lend out to other [`Handler`]s. /// @@ -502,27 +523,17 @@ impl Handler { >, ) { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), + upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed) => { + upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed) + } + upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)) => { + self.pending_error = Some(upgrade::UpgradeError::Select( + upgrade::NegotiationError::ProtocolError(e), )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - inbound_hop::UpgradeError::Fatal(error), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::A(error)), - )); + upgrade::UpgradeError::Apply(error) => { + self.pending_error = Some(upgrade::UpgradeError::Apply(EitherError::A(error))); return; } }; @@ -545,35 +556,23 @@ impl Handler { >, ) { let (non_fatal_error, status) = match error { - ConnectionHandlerUpgrErr::Timeout => { - (ConnectionHandlerUpgrErr::Timeout, Status::ConnectionFailed) - } - ConnectionHandlerUpgrErr::Timer => { - (ConnectionHandlerUpgrErr::Timer, Status::ConnectionFailed) - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => { + upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed) => { // The remote has previously done a reservation. Doing a reservation but not // supporting the stop protocol is pointless, thus disconnecting. - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed), + self.pending_error = Some(upgrade::UpgradeError::Select( + upgrade::NegotiationError::Failed, )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), + upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)) => { + self.pending_error = Some(upgrade::UpgradeError::Select( + upgrade::NegotiationError::ProtocolError(e), )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error { + upgrade::UpgradeError::Apply(error) => match error { outbound_stop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); + self.pending_error = Some(upgrade::UpgradeError::Apply(EitherError::B(error))); return; } outbound_stop::UpgradeError::CircuitFailed(error) => { @@ -585,10 +584,7 @@ impl Handler { Status::PermissionDenied } }; - ( - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)), - status, - ) + (upgrade::UpgradeError::Apply(error), status) } }, }; @@ -614,8 +610,8 @@ impl Handler { } enum ReservationRequestFuture { - Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), - Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), + Accepting(BoxFuture<'static, Result<(), inbound_hop::FatalUpgradeError>>), + Denying(BoxFuture<'static, Result<(), inbound_hop::FatalUpgradeError>>), } type Futures = FuturesUnordered>; @@ -623,9 +619,8 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; + type Error = + UpgradeError>; type InboundProtocol = inbound_hop::Upgrade; type OutboundProtocol = outbound_stop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -974,6 +969,24 @@ impl ConnectionHandler for Handler { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } + ConnectionEvent::DialTimeout(DialTimeout { info }) => { + let OutboundOpenInfo { + circuit_id, + inbound_circuit_req, + src_peer_id, + src_connection_id, + } = info; + + self.queued_events.push_back(ConnectionHandlerEvent::Custom( + Event::OutboundConnectTimedOut { + circuit_id, + src_peer_id, + src_connection_id, + inbound_circuit_req, + status: Status::ConnectionFailed, + }, + )); + } ConnectionEvent::AddressChange(_) => {} } } diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index aa5a82043f5..b212e65542b 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -59,7 +59,9 @@ pub mod outbound { /// Everything related to the relay protocol from a client's perspective. pub mod client { - pub use crate::priv_client::{new, transport::Transport, Behaviour, Connection, Event}; + pub use crate::priv_client::{ + new, transport::Transport, Behaviour, Connection, Event, InboundError, + }; pub mod transport { pub use crate::priv_client::transport::Error; diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 7bb79f5d246..2cd05a8dbf7 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -33,18 +33,18 @@ use futures::io::{AsyncRead, AsyncWrite}; use futures::ready; use futures::stream::StreamExt; use libp2p_core::connection::ConnectionId; -use libp2p_core::PeerId; +use libp2p_core::{PeerId, UpgradeError}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; +use thiserror::Error; use transport::Transport; /// The events produced by the client `Behaviour`. @@ -61,7 +61,7 @@ pub enum Event { relay_peer_id: PeerId, /// Indicates whether the request replaces an existing reservation. renewal: bool, - error: ConnectionHandlerUpgrErr, + error: InboundError, }, OutboundCircuitEstablished { relay_peer_id: PeerId, @@ -69,7 +69,7 @@ pub enum Event { }, OutboundCircuitReqFailed { relay_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: InboundError, }, /// An inbound circuit has been established. InboundCircuitEstablished { @@ -78,7 +78,7 @@ pub enum Event { }, InboundCircuitReqFailed { relay_peer_id: PeerId, - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, /// An inbound circuit request has been denied. InboundCircuitReqDenied { src_peer_id: PeerId }, @@ -89,6 +89,14 @@ pub enum Event { }, } +#[derive(Debug, Error)] +pub enum InboundError { + #[error("An outbound request upgrade failed: {0}.")] + Upgrade(UpgradeError), + #[error("An inbound request timeout expired.")] + Timeout, +} + /// [`NetworkBehaviour`] implementation of the relay client /// functionality of the circuit relay v2 protocol. pub struct Behaviour { @@ -214,7 +222,14 @@ impl NetworkBehaviour for Behaviour { self.queued_actions.push_back(Event::ReservationReqFailed { relay_peer_id: event_source, renewal, - error, + error: InboundError::Upgrade(error), + }) + } + handler::Event::ReservationReqTimeout { renewal } => { + self.queued_actions.push_back(Event::ReservationReqFailed { + relay_peer_id: event_source, + renewal, + error: InboundError::Timeout, }) } handler::Event::OutboundCircuitEstablished { limit } => { @@ -228,7 +243,14 @@ impl NetworkBehaviour for Behaviour { self.queued_actions .push_back(Event::OutboundCircuitReqFailed { relay_peer_id: event_source, - error, + error: InboundError::Upgrade(error), + }) + } + handler::Event::OutboundCircuitReqTimedOut => { + self.queued_actions + .push_back(Event::OutboundCircuitReqFailed { + relay_peer_id: event_source, + error: InboundError::Timeout, }) } handler::Event::InboundCircuitEstablished { src_peer_id, limit } => self diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index c9f7c18adbb..47c96f6007b 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -30,14 +30,14 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::either::EitherError; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ConnectionEvent, DialTimeout, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, SubstreamProtocol, + dummy, ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, KeepAlive, + SubstreamProtocol, }; use log::debug; use std::collections::{HashMap, VecDeque}; @@ -85,12 +85,16 @@ pub enum Event { ReservationReqFailed { /// Indicates whether the request replaces an existing reservation. renewal: bool, - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, + /// A reservation request timeout expired. + ReservationReqTimeout { renewal: bool }, /// An outbound circuit has been established. OutboundCircuitEstablished { limit: Option }, + /// An outbound circuit request timeout expired. + OutboundCircuitReqTimedOut, OutboundCircuitReqFailed { - error: ConnectionHandlerUpgrErr, + error: UpgradeError, }, /// An inbound circuit has been established. InboundCircuitEstablished { @@ -98,9 +102,7 @@ pub enum Event { limit: Option, }, /// An inbound circuit request has failed. - InboundCircuitReqFailed { - error: ConnectionHandlerUpgrErr, - }, + InboundCircuitReqFailed { error: UpgradeError }, /// An inbound circuit request has been denied. InboundCircuitReqDenied { src_peer_id: PeerId }, /// Denying an inbound circuit request failed. @@ -173,9 +175,7 @@ pub struct Handler { remote_addr: Multiaddr, /// A pending fatal error that results in the connection being closed. pending_error: Option< - ConnectionHandlerUpgrErr< - EitherError, - >, + UpgradeError>, >, /// Until when to keep the connection alive. keep_alive: KeepAlive, @@ -347,27 +347,17 @@ impl Handler { >, ) { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), + UpgradeError::Select(upgrade::NegotiationError::Failed) => { + UpgradeError::Select(upgrade::NegotiationError::Failed) + } + upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)) => { + self.pending_error = Some(upgrade::UpgradeError::Select( + upgrade::NegotiationError::ProtocolError(e), )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - inbound_stop::UpgradeError::Fatal(error), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::A(error)), - )); + upgrade::UpgradeError::Apply(inbound_stop::UpgradeError::Fatal(error)) => { + self.pending_error = Some(upgrade::UpgradeError::Apply(EitherError::A(error))); return; } }; @@ -392,43 +382,27 @@ impl Handler { match open_info { OutboundOpenInfo::Reserve { mut to_listener } => { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), + upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed) => { + UpgradeError::Select(upgrade::NegotiationError::Failed) + } + upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)) => { + self.pending_error = Some(UpgradeError::Select( + upgrade::NegotiationError::ProtocolError(e), )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - return; - } - outbound_hop::UpgradeError::ReservationFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::CircuitFailed(_) => { - unreachable!( - "Do not emitt `CircuitFailed` for outgoing reservation." - ) - } + upgrade::UpgradeError::Apply(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = Some(UpgradeError::Apply(EitherError::B(error))); + return; } - } + outbound_hop::UpgradeError::ReservationFailed(error) => { + UpgradeError::Apply(error) + } + outbound_hop::UpgradeError::CircuitFailed(_) => { + unreachable!("Do not emitt `CircuitFailed` for outgoing reservation.") + } + }, }; if self.pending_error.is_none() { @@ -455,43 +429,27 @@ impl Handler { } OutboundOpenInfo::Connect { send_back } => { let non_fatal_error = match error { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), + upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed) => { + UpgradeError::Select(upgrade::NegotiationError::Failed) + } + upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)) => { + self.pending_error = Some(UpgradeError::Select( + upgrade::NegotiationError::ProtocolError(e), )); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - return; - } - outbound_hop::UpgradeError::CircuitFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::ReservationFailed(_) => { - unreachable!( - "Do not emitt `ReservationFailed` for outgoing circuit." - ) - } + upgrade::UpgradeError::Apply(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = Some(UpgradeError::Apply(EitherError::B(error))); + return; } - } + outbound_hop::UpgradeError::CircuitFailed(error) => { + UpgradeError::Apply(error) + } + outbound_hop::UpgradeError::ReservationFailed(_) => { + unreachable!("Do not emitt `ReservationFailed` for outgoing circuit.") + } + }, }; let _ = send_back.send(Err(())); @@ -504,14 +462,47 @@ impl Handler { } } } + + fn on_dial_timeout( + &mut self, + DialTimeout { info }: DialTimeout<::OutboundOpenInfo>, + ) { + match info { + OutboundOpenInfo::Reserve { mut to_listener } => { + if self.pending_error.is_none() { + self.send_error_futs.push( + async move { + let _ = to_listener + .send(transport::ToListenerMsg::Reservation(Err(()))) + .await; + } + .boxed(), + ); + } else { + // Fatal error occured, thus handler is closing as quickly as possible. + // Transport is notified through dropping `to_listener`. + } + let renewal = self.reservation.failed(); + self.queued_events.push_back(ConnectionHandlerEvent::Custom( + Event::ReservationReqTimeout { renewal }, + )); + } + OutboundOpenInfo::Connect { send_back } => { + let _ = send_back.send(Err(())); + + self.queued_events.push_back(ConnectionHandlerEvent::Custom( + Event::OutboundCircuitReqTimedOut, + )); + } + } + } } impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; + type Error = + UpgradeError>; type InboundProtocol = inbound_stop::Upgrade; type OutboundProtocol = outbound_hop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -655,6 +646,9 @@ impl ConnectionHandler for Handler { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } + ConnectionEvent::DialTimeout(dial_timeout) => { + self.on_dial_timeout(dial_timeout); + } ConnectionEvent::AddressChange(_) => {} } } diff --git a/protocols/relay/src/protocol/inbound_hop.rs b/protocols/relay/src/protocol/inbound_hop.rs index dec290e0e6c..11882b752c4 100644 --- a/protocols/relay/src/protocol/inbound_hop.rs +++ b/protocols/relay/src/protocol/inbound_hop.rs @@ -47,7 +47,7 @@ impl upgrade::UpgradeInfo for Upgrade { impl upgrade::InboundUpgrade for Upgrade { type Output = Req; - type Error = UpgradeError; + type Error = FatalUpgradeError; type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { @@ -79,9 +79,7 @@ impl upgrade::InboundUpgrade for Upgrade { .map_err(|_| FatalUpgradeError::ParsePeerId)?; Req::Connect(CircuitReq { dst, substream }) } - hop_message::Type::Status => { - return Err(FatalUpgradeError::UnexpectedTypeStatus.into()) - } + hop_message::Type::Status => return Err(FatalUpgradeError::UnexpectedTypeStatus), }; Ok(req) @@ -90,18 +88,6 @@ impl upgrade::InboundUpgrade for Upgrade { } } -#[derive(Debug, Error)] -pub enum UpgradeError { - #[error("Fatal")] - Fatal(#[from] FatalUpgradeError), -} - -impl From for UpgradeError { - fn from(error: prost_codec::Error) -> Self { - Self::Fatal(error.into()) - } -} - #[derive(Debug, Error)] pub enum FatalUpgradeError { #[error(transparent)] @@ -131,7 +117,7 @@ pub struct ReservationReq { } impl ReservationReq { - pub async fn accept(self, addrs: Vec) -> Result<(), UpgradeError> { + pub async fn accept(self, addrs: Vec) -> Result<(), FatalUpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -158,7 +144,7 @@ impl ReservationReq { self.send(msg).await } - pub async fn deny(self, status: Status) -> Result<(), UpgradeError> { + pub async fn deny(self, status: Status) -> Result<(), FatalUpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -170,7 +156,7 @@ impl ReservationReq { self.send(msg).await } - async fn send(mut self, msg: HopMessage) -> Result<(), UpgradeError> { + async fn send(mut self, msg: HopMessage) -> Result<(), FatalUpgradeError> { self.substream.send(msg).await?; self.substream.flush().await?; self.substream.close().await?; @@ -189,7 +175,7 @@ impl CircuitReq { self.dst } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { + pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), FatalUpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -214,7 +200,7 @@ impl CircuitReq { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { + pub async fn deny(mut self, status: Status) -> Result<(), FatalUpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index d16191b9b06..70ad2755024 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -2,7 +2,10 @@ - Update to `libp2p-core` `v0.39.0`. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. See [PR 3307]. + +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.11.0 diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index 16a493ccc3a..533a1527e97 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -396,6 +396,7 @@ where // TODO: Handle upgrade errors properly ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::DialTimeout(_) | ConnectionEvent::DialUpgradeError(_) => {} } } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 613e7f6eab8..aa05b3f86aa 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -9,10 +9,12 @@ and refer to its types via `request_response::`. For example: `request_response::Behaviour` or `request_response::Event`. See [PR 3159]. -- Update to `libp2p-swarm` `v0.42.0`. +- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` + introduction and consequential changes. See [PR 3307]. [discussion 2174]: https://github.com/libp2p/rust-libp2p/discussions/2174 [PR 3159]: https://github.com/libp2p/rust-libp2p/pull/3159 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.23.0 diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 50cd6adb055..da87424a7d8 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,8 +24,8 @@ use crate::codec::Codec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, + ConnectionEvent, DialTimeout, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, }; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; @@ -33,7 +33,7 @@ use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUn use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::{ - handler::{ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive}, + handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive}, SubstreamProtocol, }; use smallvec::SmallVec; @@ -72,7 +72,7 @@ where /// The current connection keep-alive. keep_alive: KeepAlive, /// A pending fatal error that results in the connection being closed. - pending_error: Option>, + pending_error: Option>, /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -145,10 +145,7 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events.push_back(Event::OutboundTimeout(info)); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { // The remote merely doesn't support the protocol(s) we requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. @@ -172,10 +169,7 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events.push_back(Event::InboundTimeout(info)) - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { // The local peer merely doesn't support the protocol(s) requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. @@ -284,7 +278,7 @@ where { type InEvent = RequestProtocol; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; + type Error = UpgradeError; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; @@ -419,6 +413,9 @@ where response, }); } + ConnectionEvent::DialTimeout(DialTimeout { info }) => { + self.pending_events.push_back(Event::OutboundTimeout(info)) + } ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 68e38fe4a00..d912aab4b2a 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,12 @@ # 0.42.0 [unreleased] +- Remove uncontructed variant `Timer` from `ConnectionHandlerUpgrErr`. + Make `ListenUpgradeError::error` an `UpgradeError` instead of `ConnectionHandlerUpgrErr`, in case of timeout expiration we + just log the error as we don't know which ConnectionHandler should handle the error. + Make `DialUpgradeError::error` an `UpgradeError` instead of `ConnectionHandlerUpgrErr`, and introduce `ConnectionEvent::DialTimeout` + for the situations where that upgrading an outbound substream to the given protocol has expired its timeout. + See [PR 3307] + - Update to `libp2p-core` `v0.39.0`. - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. @@ -20,6 +27,7 @@ [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 [PR 3260]: https://github.com/libp2p/rust-libp2p/pull/3260 [PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 +[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307 # 0.41.1 diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index ae198dc2bd3..38e288ce5e2 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -20,9 +20,9 @@ use crate::behaviour::FromSwarm; use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, - ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialTimeout, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; @@ -210,20 +210,12 @@ where ), }; - let err = match err { - ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(err) => { - ConnectionHandlerUpgrErr::Upgrade(err.map_err(|err| match err { - EitherError::A(e) => e, - EitherError::B(v) => void::unreachable(v), - })) - } - }; - inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, - error: err, + error: err.map_err(|err| match err { + EitherError::A(e) => e, + EitherError::B(v) => void::unreachable(v), + }), })); } } @@ -328,6 +320,11 @@ where info, error: err, })), + ConnectionEvent::DialTimeout(DialTimeout { info }) => self + .inner + .as_mut() + .expect("Can't receive an outbound substream if disabled; QED") + .on_connection_event(ConnectionEvent::DialTimeout(DialTimeout { info })), ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { self.on_listen_upgrade_error(listen_upgrade_error) } @@ -339,6 +336,7 @@ where mod tests { use super::*; use crate::dummy; + use libp2p_core::{upgrade::NegotiationError, UpgradeError}; /// A disabled [`ToggleConnectionHandler`] can receive listen upgrade errors in /// the following two cases: @@ -360,7 +358,7 @@ mod tests { handler.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: Either::Right(()), - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Select(NegotiationError::Failed), })); } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 15a414c01e4..41d581fd534 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -28,11 +28,11 @@ pub use error::{ }; use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound, - FullyNegotiatedOutbound, ListenUpgradeError, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerUpgrErr, DialTimeout, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; -use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; +use crate::{ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; @@ -183,12 +183,7 @@ where match requested_substreams.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => continue, Poll::Ready(Some(Err(info))) => { - handler.on_connection_event(ConnectionEvent::DialUpgradeError( - DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Timeout, - }, - )); + handler.on_connection_event(ConnectionEvent::DialTimeout(DialTimeout { info })); continue; } Poll::Ready(None) | Poll::Pending => {} @@ -221,12 +216,20 @@ where )); continue; } - Poll::Ready(Some((info, Err(error)))) => { - handler.on_connection_event(ConnectionEvent::DialUpgradeError( - DialUpgradeError { info, error }, - )); - continue; - } + Poll::Ready(Some((info, Err(error)))) => match error { + ConnectionHandlerUpgrErr::Upgrade(error) => { + handler.on_connection_event(ConnectionEvent::DialUpgradeError( + DialUpgradeError { info, error }, + )); + continue; + } + ConnectionHandlerUpgrErr::Timeout => { + handler.on_connection_event(ConnectionEvent::DialTimeout(DialTimeout { + info, + })); + continue; + } + }, } // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not @@ -239,12 +242,17 @@ where )); continue; } - Poll::Ready(Some((info, Err(error)))) => { - handler.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { info, error }, - )); - continue; - } + Poll::Ready(Some((info, Err(error)))) => match error { + ConnectionHandlerUpgrErr::Upgrade(error) => { + handler.on_connection_event(ConnectionEvent::ListenUpgradeError( + ListenUpgradeError { info, error }, + )); + continue; + } + ConnectionHandlerUpgrErr::Timeout => { + log::debug!("Timeout expired during an inbound substream negotiation") + } + }, } // Ask the handler whether it wants the connection (and the handler itself) @@ -787,7 +795,10 @@ mod tests { .. }) => void::unreachable(protocol), ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - self.error = Some(error) + self.error = Some(ConnectionHandlerUpgrErr::Upgrade(error)) + } + ConnectionEvent::DialTimeout(_) => { + self.error = Some(ConnectionHandlerUpgrErr::Timeout) } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 4ec58581c2e..2e6699b012e 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -2,7 +2,7 @@ use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, Poll use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; -use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; +use crate::{ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::PeerId; @@ -106,11 +106,10 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, .. }) => void::unreachable(protocol), + ConnectionEvent::DialTimeout(_) => unreachable!(), ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error { - ConnectionHandlerUpgrErr::Timeout => unreachable!(), - ConnectionHandlerUpgrErr::Timer => unreachable!(), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => { + UpgradeError::Apply(e) => void::unreachable(e), + UpgradeError::Select(_) => { unreachable!("Denied upgrade does not support any protocols") } }, diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index f60e94e2465..91fc27d33ea 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -207,6 +207,8 @@ pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IO AddressChange(AddressChange<'a>), /// Informs the handler that upgrading an outbound substream to the given protocol has failed. DialUpgradeError(DialUpgradeError), + /// Informs the handler that upgrading an outbound substream to the given protocol has expired its timeout. + DialTimeout(DialTimeout), /// Informs the handler that upgrading an inbound substream to the given protocol has failed. ListenUpgradeError(ListenUpgradeError), } @@ -242,14 +244,20 @@ pub struct AddressChange<'a> { /// that upgrading an outbound substream to the given protocol has failed. pub struct DialUpgradeError { pub info: OOI, - pub error: ConnectionHandlerUpgrErr, + pub error: UpgradeError, +} + +/// [`ConnectionEvent`] variant that informs the handler +/// that upgrading an outbound substream has expired its timeout. +pub struct DialTimeout { + pub info: OOI, } /// [`ConnectionEvent`] variant that informs the handler /// that upgrading an inbound substream to the given protocol has failed. pub struct ListenUpgradeError { pub info: IOI, - pub error: ConnectionHandlerUpgrErr, + pub error: UpgradeError, } /// Configuration of inbound or outbound substream protocol(s) @@ -435,8 +443,6 @@ impl pub enum ConnectionHandlerUpgrErr { /// The opening attempt timed out before the negotiation was fully completed. Timeout, - /// There was an error in the timer used. - Timer, /// Error while upgrading the substream to the protocol we want. Upgrade(UpgradeError), } @@ -449,7 +455,6 @@ impl ConnectionHandlerUpgrErr { { match self { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, - ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, ConnectionHandlerUpgrErr::Upgrade(e) => ConnectionHandlerUpgrErr::Upgrade(f(e)), } } @@ -464,9 +469,6 @@ where ConnectionHandlerUpgrErr::Timeout => { write!(f, "Timeout error while opening a substream") } - ConnectionHandlerUpgrErr::Timer => { - write!(f, "Timer error while opening a substream") - } ConnectionHandlerUpgrErr::Upgrade(err) => write!(f, "{err}"), } } @@ -479,7 +481,6 @@ where fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { ConnectionHandlerUpgrErr::Timeout => None, - ConnectionHandlerUpgrErr::Timer => None, ConnectionHandlerUpgrErr::Upgrade(err) => Some(err), } } diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 078fc1ed8aa..d5f4bfca6dc 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, + ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialTimeout, DialUpgradeError, + FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, IntoConnectionHandler, + KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; @@ -143,146 +143,84 @@ where FullyNegotiatedOutbound { protocol, info }, ))), ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(error))), + error: UpgradeError::Apply(EitherError::A(error)), info: Either::Left(info), }) => Ok(Either::Left(ConnectionEvent::DialUpgradeError( DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), + error: UpgradeError::Apply(error), info, }, ))), ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(error))), + error: UpgradeError::Apply(EitherError::B(error)), info: Either::Right(info), }) => Ok(Either::Right(ConnectionEvent::DialUpgradeError( DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), + error: UpgradeError::Apply(error), info, }, ))), ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + error: UpgradeError::Select(error), info: Either::Left(info), }) => Ok(Either::Left(ConnectionEvent::DialUpgradeError( DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + error: UpgradeError::Select(error), info, }, ))), ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + error: UpgradeError::Select(error), info: Either::Right(info), }) => Ok(Either::Right(ConnectionEvent::DialUpgradeError( DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + error: UpgradeError::Select(error), info, }, ))), - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info: Either::Left(info), - }) => Ok(Either::Left(ConnectionEvent::DialUpgradeError( - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info, - }, - ))), - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info: Either::Right(info), - }) => Ok(Either::Right(ConnectionEvent::DialUpgradeError( - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, - info, - }, - ))), - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info: Either::Left(info), - }) => Ok(Either::Left(ConnectionEvent::DialUpgradeError( - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info, - }, - ))), - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info: Either::Right(info), - }) => Ok(Either::Right(ConnectionEvent::DialUpgradeError( - DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, - info, - }, - ))), - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(error))), + ConnectionEvent::DialTimeout(DialTimeout { info: Either::Left(info), - }) => Ok(Either::Left(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), - info, - }, - ))), - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(error))), + }) => Ok(Either::Left(ConnectionEvent::DialTimeout(DialTimeout { + info, + }))), + ConnectionEvent::DialTimeout(DialTimeout { info: Either::Right(info), - }) => Ok(Either::Right(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), - info, - }, - ))), - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Left(info), - }) => Ok(Either::Left(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info, - }, - ))), - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Right(info), - }) => Ok(Either::Right(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info, - }, - ))), + }) => Ok(Either::Right(ConnectionEvent::DialTimeout(DialTimeout { + info, + }))), ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Apply(EitherError::A(error)), info: Either::Left(info), }) => Ok(Either::Left(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Apply(error), info, }, ))), ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Apply(EitherError::B(error)), info: Either::Right(info), }) => Ok(Either::Right(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Apply(error), info, }, ))), ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Select(error), info: Either::Left(info), }) => Ok(Either::Left(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Select(error), info, }, ))), ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Select(error), info: Either::Right(info), }) => Ok(Either::Right(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Select(error), info, }, ))), diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 35f0ebd7995..4e1caac3d6a 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -22,9 +22,9 @@ //! indexed by some key. use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, - ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialTimeout, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::NegotiatedSubstream; @@ -92,47 +92,19 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timer => { + UpgradeError::Select(NegotiationError::Failed) => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Select(NegotiationError::Failed), }, )); } } } - ConnectionHandlerUpgrErr::Timeout => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Timeout, - }, - )); - } - } - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), - }, - )); - } - } - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )) => match e { + UpgradeError::Select(NegotiationError::ProtocolError(e)) => match e { ProtocolError::IoError(e) => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { @@ -142,9 +114,7 @@ where h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), + error: UpgradeError::Select(e), }, )); } @@ -157,9 +127,7 @@ where h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), + error: UpgradeError::Select(e), }, )); } @@ -172,9 +140,7 @@ where h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), + error: UpgradeError::Select(e), }, )); } @@ -188,22 +154,20 @@ where h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), + error: UpgradeError::Select(e), }, )); } } } }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => { + UpgradeError::Apply((k, e)) => { if let Some(h) = self.handlers.get_mut(&k) { if let Some(i) = info.take(&k) { h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: UpgradeError::Apply(e), }, )); } @@ -312,6 +276,13 @@ where log::error!("DialUpgradeError: no handler for protocol") } } + ConnectionEvent::DialTimeout(DialTimeout { info: (key, arg) }) => { + if let Some(h) = self.handlers.get_mut(&key) { + h.on_connection_event(ConnectionEvent::DialTimeout(DialTimeout { info: arg })); + } else { + log::error!("DialTimeout: no handler for protocol") + } + } ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { self.on_listen_upgrade_error(listen_upgrade_error) } diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index e8cd03ebed8..24e439d2698 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -213,7 +213,12 @@ where } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { if self.pending_error.is_none() { - self.pending_error = Some(error); + self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(error)); + } + } + ConnectionEvent::DialTimeout(_) => { + if self.pending_error.is_none() { + self.pending_error = Some(ConnectionHandlerUpgrErr::Timeout) } } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 2efa949dd71..e849e5b02ed 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -99,6 +99,7 @@ impl ConnectionHandler for PendingConnectionHandler { } ConnectionEvent::AddressChange(_) | ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::DialTimeout(_) | ConnectionEvent::ListenUpgradeError(_) => {} } } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index c979d91c71e..844dbb71ef9 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -19,14 +19,13 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, - ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - InboundUpgradeSend, IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, - SubstreamProtocol, + AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialTimeout, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend, + IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol, }; use crate::upgrade::SendWrapper; -use either::Either; +use either::Either::{self, Right}; use libp2p_core::{ either::{EitherError, EitherOutput}, upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, @@ -165,65 +164,54 @@ where match self { DialUpgradeError { info: EitherOutput::First(info), - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Select(err), } => Either::Left(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Timer, + error: UpgradeError::Select(err), }), DialUpgradeError { info: EitherOutput::First(info), - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Apply(EitherError::A(err)), } => Either::Left(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Timeout, - }), - DialUpgradeError { - info: EitherOutput::First(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - } => Either::Left(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - }), - DialUpgradeError { - info: EitherOutput::First(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), - } => Either::Left(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), - }), - DialUpgradeError { - info: EitherOutput::Second(info), - error: ConnectionHandlerUpgrErr::Timer, - } => Either::Right(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Timer, - }), - DialUpgradeError { - info: EitherOutput::Second(info), - error: ConnectionHandlerUpgrErr::Timeout, - } => Either::Right(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Timeout, + error: UpgradeError::Apply(err), }), DialUpgradeError { info: EitherOutput::Second(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + error: UpgradeError::Select(err), } => Either::Right(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), + error: UpgradeError::Select(err), }), DialUpgradeError { info: EitherOutput::Second(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))), + error: UpgradeError::Apply(EitherError::B(err)), } => Either::Right(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + error: UpgradeError::Apply(err), }), _ => panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"), } } } +impl DialTimeout> +where + S1OOI: Send + 'static, + S2OOI: Send + 'static, +{ + fn transpose(self) -> Either, DialTimeout> { + match self { + DialTimeout { + info: EitherOutput::First(info), + } => Either::Left(DialTimeout { info }), + DialTimeout { + info: EitherOutput::Second(info), + } => Either::Right(DialTimeout { info }), + } + } +} + impl ConnectionHandlerSelect where TProto1: ConnectionHandler, @@ -240,52 +228,20 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timer => { - self.proto1 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i1, - error: ConnectionHandlerUpgrErr::Timer, - })); - - self.proto2 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i2, - error: ConnectionHandlerUpgrErr::Timer, - })); - } - ConnectionHandlerUpgrErr::Timeout => { - self.proto1 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i1, - error: ConnectionHandlerUpgrErr::Timeout, - })); - - self.proto2 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i2, - error: ConnectionHandlerUpgrErr::Timeout, - })); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + UpgradeError::Select(NegotiationError::Failed) => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: UpgradeError::Select(NegotiationError::Failed), })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: UpgradeError::Select(NegotiationError::Failed), })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )) => { + UpgradeError::Select(NegotiationError::ProtocolError(e)) => { let (e1, e2); match e { ProtocolError::IoError(e) => { @@ -310,26 +266,26 @@ where self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)), + error: UpgradeError::Select(e1), })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)), + error: UpgradeError::Select(e2), })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + UpgradeError::Apply(EitherError::A(e)) => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: UpgradeError::Apply(e), })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { + UpgradeError::Apply(EitherError::B(e)) => { self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: UpgradeError::Apply(e), })); } } @@ -478,6 +434,14 @@ where .on_connection_event(ConnectionEvent::DialUpgradeError(err)), } } + ConnectionEvent::DialTimeout(dial_timeout) => match dial_timeout.transpose() { + Either::Left(dial_timeout) => self + .proto1 + .on_connection_event(ConnectionEvent::DialTimeout(dial_timeout)), + Right(dial_timeout) => self + .proto2 + .on_connection_event(ConnectionEvent::DialTimeout(dial_timeout)), + }, ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { self.on_listen_upgrade_error(listen_upgrade_error) } diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index bd1ed812b8b..6e19c1bac22 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -112,6 +112,7 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { protocol, .. }) => void::unreachable(protocol), ConnectionEvent::DialUpgradeError(_) + | ConnectionEvent::DialTimeout(_) | ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::AddressChange(_) => {} } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cf0b4c3b08a..bcff73d1a07 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -113,10 +113,13 @@ pub use connection::{ }; pub use executor::Executor; pub use handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, - IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, - OneShotHandlerConfig, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, IntoConnectionHandler, + IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, OneShotHandlerConfig, + SubstreamProtocol, }; + +#[deprecated(since = "0.42.0", note = "Shouldn't be required by end users")] +pub type ConnectionHandlerUpgrErr = handler::ConnectionHandlerUpgrErr; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; pub use registry::{AddAddressResult, AddressRecord, AddressScore};