diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 5fb84593488..c2710631bdb 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -4,8 +4,11 @@ See [PR 3715]. - Remove deprecated items. See [PR 3700]. +- Keep connection alive while we are using it. See [PR 3960]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3700]: https://github.com/libp2p/rust-libp2p/pull/3700 +[PR 3960]: https://github.com/libp2p/rust-libp2p/pull/3960 ## 0.9.1 diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index c6ce82b927c..6e3406c6f13 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -26,9 +26,11 @@ use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; +use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; -use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; +use libp2p_swarm::{ + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent, +}; use libp2p_swarm::{ ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, ToSwarm, @@ -38,7 +40,7 @@ use std::task::{Context, Poll}; use thiserror::Error; use void::Void; -const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; +pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; /// The events produced by the [`Behaviour`]. #[derive(Debug)] @@ -107,51 +109,6 @@ impl Behaviour { .collect() } - fn on_connection_established( - &mut self, - ConnectionEstablished { - peer_id, - connection_id, - endpoint: connected_point, - .. - }: ConnectionEstablished, - ) { - if connected_point.is_relayed() { - if connected_point.is_listener() && !self.direct_connections.contains_key(&peer_id) { - // TODO: Try dialing the remote peer directly. Specification: - // - // > The protocol starts with the completion of a relay connection from A to B. Upon - // observing the new connection, the inbound peer (here B) checks the addresses - // advertised by A via identify. If that set includes public addresses, then A may - // be reachable by a direct connection, in which case B attempts a unilateral - // connection upgrade by initiating a direct connection to A. - // - // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol - self.queued_events.extend([ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection_id), - event: Either::Left(handler::relayed::Command::Connect { - obs_addrs: self.observed_addresses(), - }), - }, - ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: peer_id, - local_relayed_addr: match connected_point { - ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), - ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), - }, - }), - ]); - } - } else { - self.direct_connections - .entry(peer_id) - .or_default() - .insert(connection_id); - } - } - fn on_dial_failure( &mut self, DialFailure { @@ -188,22 +145,15 @@ impl Behaviour { self.queued_events.push_back(ToSwarm::NotifyHandler { handler: NotifyHandler::One(relayed_connection_id), peer_id, - event: Either::Left(handler::relayed::Command::Connect { - obs_addrs: self.observed_addresses(), - }), + event: Either::Left(handler::relayed::Command::Connect), }) } else { - self.queued_events.extend([ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), - }, - ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed { + self.queued_events.extend([ToSwarm::GenerateEvent( + Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, error: Error::Dial, - }), - ]); + }, + )]); } } @@ -239,18 +189,32 @@ impl NetworkBehaviour for Behaviour { fn handle_established_inbound_connection( &mut self, connection_id: ConnectionId, - _peer: PeerId, + peer: PeerId, local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { if is_relayed(local_addr) { - return Ok(Either::Left(handler::relayed::Handler::new( - ConnectedPoint::Listener { - local_addr: local_addr.clone(), - send_back_addr: remote_addr.clone(), + let connected_point = ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }; + let mut handler = + handler::relayed::Handler::new(connected_point, self.observed_addresses()); + handler.on_behaviour_event(handler::relayed::Command::Connect); + + self.queued_events.extend([ToSwarm::GenerateEvent( + Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: peer, + local_relayed_addr: local_addr.clone(), }, - ))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. + )]); + + return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. } + self.direct_connections + .entry(peer) + .or_default() + .insert(connection_id); assert!( self.direct_to_relayed_connections @@ -275,9 +239,15 @@ impl NetworkBehaviour for Behaviour { address: addr.clone(), role_override, }, + self.observed_addresses(), ))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. } + self.direct_connections + .entry(peer) + .or_default() + .insert(connection_id); + // Whether this is a connection requested by this behaviour. if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id) { @@ -290,16 +260,11 @@ impl NetworkBehaviour for Behaviour { ); } - self.queued_events.extend([ - ToSwarm::NotifyHandler { - peer_id: peer, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), - }, - ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded { + self.queued_events.extend([ToSwarm::GenerateEvent( + Event::DirectConnectionUpgradeSucceeded { remote_peer_id: peer, - }), - ]); + }, + )]); } Ok(Either::Right(dummy::ConnectionHandler)) @@ -323,24 +288,13 @@ impl NetworkBehaviour for Behaviour { }; match handler_event { - Either::Left(handler::relayed::Event::InboundConnectRequest { - inbound_connect, - remote_addr, - }) => { - self.queued_events.extend([ - ToSwarm::NotifyHandler { - handler: NotifyHandler::One(relayed_connection_id), - peer_id: event_source, - event: Either::Left(handler::relayed::Command::AcceptInboundConnect { - inbound_connect, - obs_addrs: self.observed_addresses(), - }), - }, - ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade { + Either::Left(handler::relayed::Event::InboundConnectRequest { remote_addr }) => { + self.queued_events.extend([ToSwarm::GenerateEvent( + Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: event_source, remote_relayed_addr: remote_addr, - }), - ]); + }, + )]); } Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { self.queued_events.push_back(ToSwarm::GenerateEvent( @@ -407,14 +361,12 @@ impl NetworkBehaviour for Behaviour { self.external_addresses.on_swarm_event(&event); match event { - FromSwarm::ConnectionEstablished(connection_established) => { - self.on_connection_established(connection_established) - } FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), FromSwarm::AddressChange(_) + | FromSwarm::ConnectionEstablished(_) | FromSwarm::ListenFailure(_) | FromSwarm::NewListener(_) | FromSwarm::NewListenAddr(_) diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index c71b27b831d..ff22f2b18e1 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -20,11 +20,11 @@ //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. +use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; use crate::protocol; use either::Either; use futures::future; use futures::future::{BoxFuture, FutureExt}; -use instant::Instant; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::ConnectedPoint; @@ -36,48 +36,16 @@ use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; -use std::fmt; use std::task::{Context, Poll}; -use std::time::Duration; +#[derive(Debug)] pub enum Command { - Connect { - obs_addrs: Vec, - }, - AcceptInboundConnect { - obs_addrs: Vec, - inbound_connect: Box, - }, - /// Upgrading the relayed connection to a direct connection either failed for good or succeeded. - /// There is no need to keep the relayed connection alive for the sake of upgrading to a direct - /// connection. - UpgradeFinishedDontKeepAlive, -} - -impl fmt::Debug for Command { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Command::Connect { obs_addrs } => f - .debug_struct("Command::Connect") - .field("obs_addrs", obs_addrs) - .finish(), - Command::AcceptInboundConnect { - obs_addrs, - inbound_connect: _, - } => f - .debug_struct("Command::AcceptInboundConnect") - .field("obs_addrs", obs_addrs) - .finish(), - Command::UpgradeFinishedDontKeepAlive => f - .debug_struct("Command::UpgradeFinishedDontKeepAlive") - .finish(), - } - } + Connect, } +#[derive(Debug)] pub enum Event { InboundConnectRequest { - inbound_connect: Box, remote_addr: Multiaddr, }, InboundNegotiationFailed { @@ -92,36 +60,6 @@ pub enum Event { }, } -impl fmt::Debug for Event { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Event::InboundConnectRequest { - inbound_connect: _, - remote_addr, - } => f - .debug_struct("Event::InboundConnectRequest") - .field("remote_addrs", remote_addr) - .finish(), - Event::InboundNegotiationFailed { error } => f - .debug_struct("Event::InboundNegotiationFailed") - .field("error", error) - .finish(), - Event::InboundConnectNegotiated(addrs) => f - .debug_tuple("Event::InboundConnectNegotiated") - .field(addrs) - .finish(), - Event::OutboundNegotiationFailed { error } => f - .debug_struct("Event::OutboundNegotiationFailed") - .field("error", error) - .finish(), - Event::OutboundConnectNegotiated { remote_addrs } => f - .debug_struct("Event::OutboundConnectNegotiated") - .field("remote_addrs", remote_addrs) - .finish(), - } - } -} - pub struct Handler { endpoint: ConnectedPoint, /// A pending fatal error that results in the connection being closed. @@ -142,17 +80,22 @@ pub struct Handler { /// Inbound connect, accepted by the behaviour, pending completion. inbound_connect: Option, protocol::inbound::UpgradeError>>>, - keep_alive: KeepAlive, + + /// The addresses we will send to the other party for hole-punching attempts. + holepunch_candidates: Vec, + + attempts: u8, } impl Handler { - pub fn new(endpoint: ConnectedPoint) -> Self { + pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec) -> Self { Self { endpoint, pending_error: Default::default(), queued_events: Default::default(), inbound_connect: Default::default(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), + holepunch_candidates, + attempts: 0, } } @@ -167,17 +110,29 @@ impl Handler { ) { match output { future::Either::Left(inbound_connect) => { + if self + .inbound_connect + .replace( + inbound_connect + .accept(self.holepunch_candidates.clone()) + .boxed(), + ) + .is_some() + { + log::warn!( + "New inbound connect stream while still upgrading previous one. \ + Replacing previous with new.", + ); + } let remote_addr = match &self.endpoint { ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), }; self.queued_events .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectRequest { - inbound_connect: Box::new(inbound_connect), - remote_addr, - }, + Event::InboundConnectRequest { remote_addr }, )); + self.attempts += 1; } // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. future::Either::Right(output) => void::unreachable(output), @@ -226,8 +181,6 @@ impl Handler { ::OutboundProtocol, >, ) { - self.keep_alive = KeepAlive::No; - match error { StreamUpgradeError::Timeout => { self.queued_events @@ -286,38 +239,33 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { match event { - Command::Connect { obs_addrs } => { + Command::Connect => { self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( - protocol::outbound::Upgrade::new(obs_addrs), + protocol::outbound::Upgrade::new(self.holepunch_candidates.clone()), (), ), }); - } - Command::AcceptInboundConnect { - inbound_connect, - obs_addrs, - } => { - if self - .inbound_connect - .replace(inbound_connect.accept(obs_addrs).boxed()) - .is_some() - { - log::warn!( - "New inbound connect stream while still upgrading previous one. \ - Replacing previous with new.", - ); - } - } - Command::UpgradeFinishedDontKeepAlive => { - self.keep_alive = KeepAlive::No; + self.attempts += 1; } } } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if !self.queued_events.is_empty() { + return KeepAlive::Yes; + } + + if self.inbound_connect.is_some() { + return KeepAlive::Yes; + } + + if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + return KeepAlive::Yes; + } + + KeepAlive::No } fn poll(