Skip to content

Commit

Permalink
core/src/transport: Add as_listener to Transport::dial
Browse files Browse the repository at this point in the history
Allows `NetworkBehaviour` implementations to dial a peer, but instruct
the dialed connection to be upgraded as if it were the listening
endpoint.

This is needed when establishing direct connections through NATs and/or
Firewalls (hole punching). When hole punching via TCP (QUIC is different
but similar) both ends dial the other at the same time resulting in a
simultaneously opened TCP connection. To disambiguate who is the dialer
and who the listener there are two options:

1. Use the Simultaneous Open Extension of Multistream Select. See
   [sim-open] specification and [sim-open-rust] Rust implementation.

2. Disambiguate the role (dialer or listener) based on the role within
   the DCUtR [dcutr] protocol. More specifically the node initiating the
   DCUtR process will act as a listener and the other as a dialer.

This commit enables (2), i.e. enables the DCUtR protocol to specify the
role used once the connection is established.

While on the positive side (2) requires one round trip less than (1), on
the negative side (2) only works for coordinated simultaneous dials.
I.e. when a simultaneous dial happens by chance, and not coordinated via
DCUtR, the connection attempt fails when only (2) is in place.

[sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md
[sim-open-rust]: libp2p#2066
[dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
  • Loading branch information
mxinden committed Nov 28, 2021
1 parent 16ce662 commit f18a0bb
Show file tree
Hide file tree
Showing 29 changed files with 266 additions and 82 deletions.
13 changes: 9 additions & 4 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ pub enum PendingPoint {
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer,
Dialer {
// TODO: Document
as_listener: bool,
},
/// The socket comes from a listener.
Listener {
/// Local connection address.
Expand All @@ -109,7 +112,7 @@ pub enum PendingPoint {
impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer,
ConnectedPoint::Dialer { as_listener, .. } => PendingPoint::Dialer { as_listener },
ConnectedPoint::Listener {
local_addr,
send_back_addr,
Expand All @@ -128,6 +131,8 @@ pub enum ConnectedPoint {
Dialer {
/// Multiaddress that was successfully dialed.
address: Multiaddr,
// TODO: Document
as_listener: bool,
},
/// We received the node.
Listener {
Expand Down Expand Up @@ -183,7 +188,7 @@ impl ConnectedPoint {
/// not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
}
}
Expand All @@ -193,7 +198,7 @@ impl ConnectedPoint {
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Dialer { address, .. } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
Expand Down
37 changes: 25 additions & 12 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ where
local_addr,
send_back_addr,
}),
PendingPoint::Dialer => None,
PendingPoint::Dialer { .. } => None,
})
}

Expand Down Expand Up @@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>,
handler: THandler,
as_listener: bool,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Clone + Send,
Expand All @@ -544,7 +545,13 @@ where
return Err(DialError::ConnectionLimit { limit, handler });
};

let dial = ConcurrentDial::new(transport, peer, addresses, self.dial_concurrency_factor);
let dial = ConcurrentDial::new(
transport,
peer,
addresses,
self.dial_concurrency_factor,
as_listener,
);

let connection_id = self.next_connection_id();

Expand All @@ -560,13 +567,15 @@ where
.boxed(),
);

self.counters.inc_pending(&PendingPoint::Dialer);
let endpoint = PendingPoint::Dialer { as_listener };

self.counters.inc_pending(&endpoint);
self.pending.insert(
connection_id,
PendingConnectionInfo {
peer_id: peer,
handler,
endpoint: PendingPoint::Dialer,
endpoint: endpoint,
_drop_notifier: drop_notifier,
},
);
Expand Down Expand Up @@ -739,9 +748,13 @@ where
self.counters.dec_pending(&endpoint);

let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
(PendingPoint::Dialer, Some((address, errors))) => {
(ConnectedPoint::Dialer { address }, Some(errors))
}
(PendingPoint::Dialer { as_listener }, Some((address, errors))) => (
ConnectedPoint::Dialer {
address,
as_listener,
},
Some(errors),
),
(
PendingPoint::Listener {
local_addr,
Expand All @@ -755,7 +768,7 @@ where
},
None,
),
(PendingPoint::Dialer, None) => unreachable!(
(PendingPoint::Dialer { .. }, None) => unreachable!(
"Established incoming connection via pending outgoing connection."
),
(PendingPoint::Listener { .. }, Some(_)) => unreachable!(
Expand Down Expand Up @@ -904,7 +917,7 @@ where
self.counters.dec_pending(&endpoint);

match (endpoint, error) {
(PendingPoint::Dialer, Either::Left(error)) => {
(PendingPoint::Dialer { .. }, Either::Left(error)) => {
return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id,
error,
Expand All @@ -927,7 +940,7 @@ where
local_addr,
});
}
(PendingPoint::Dialer, Either::Right(_)) => {
(PendingPoint::Dialer { .. }, Either::Right(_)) => {
unreachable!("Inbound error for outbound connection.")
}
(PendingPoint::Listener { .. }, Either::Left(_)) => {
Expand Down Expand Up @@ -1170,7 +1183,7 @@ impl ConnectionCounters {

fn inc_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing += 1;
}
PendingPoint::Listener { .. } => {
Expand All @@ -1185,7 +1198,7 @@ impl ConnectionCounters {

fn dec_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing -= 1;
}
PendingPoint::Listener { .. } => {
Expand Down
3 changes: 2 additions & 1 deletion core/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ where
peer: Option<PeerId>,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
concurrency_factor: NonZeroU8,
as_listener: bool,
) -> Self {
let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) {
Ok(address) => match transport.clone().dial(address.clone()) {
Ok(address) => match transport.clone().dial(address.clone(), as_listener) {
Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Expand Down
10 changes: 7 additions & 3 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,19 @@ where
}
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial(addr) {
EitherTransport::Left(a) => match a.dial(addr, as_listener) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial(addr) {
EitherTransport::Right(b) => match b.dial(addr, as_listener) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
Expand Down
9 changes: 7 additions & 2 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
self.pool
.iter_pending_info()
.filter(move |(_, endpoint, peer_id)| {
matches!(endpoint, PendingPoint::Dialer) && peer_id.as_ref() == Some(&peer)
matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer)
})
.map(|(connection_id, _, _)| connection_id)
}
Expand Down Expand Up @@ -195,6 +195,7 @@ where
&mut self,
address: &Multiaddr,
handler: THandler,
as_listener: bool,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport + Send,
Expand All @@ -213,6 +214,7 @@ where
peer,
addresses: std::iter::once(address.clone()),
handler,
as_listener,
});
}
}
Expand All @@ -222,6 +224,7 @@ where
std::iter::once(address.clone()),
None,
handler,
as_listener,
)
}

Expand All @@ -242,6 +245,7 @@ where
opts.addresses,
Some(opts.peer),
opts.handler,
opts.as_listener,
)?;

Ok(id)
Expand Down Expand Up @@ -279,7 +283,7 @@ where
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool
.iter_pending_info()
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer))
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. }))
.filter_map(|(_, _, peer)| peer.as_ref())
}

Expand Down Expand Up @@ -469,6 +473,7 @@ struct DialingOpts<THandler, I> {
peer: PeerId,
handler: THandler,
addresses: I,
as_listener: bool,
}

/// Information about the network obtained by [`Network::info()`].
Expand Down
2 changes: 2 additions & 0 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ where
self,
addresses: I,
handler: THandler,
as_listener: bool,
) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError<THandler>>
where
I: IntoIterator<Item = Multiaddr>,
Expand All @@ -176,6 +177,7 @@ where
peer: peer_id,
handler,
addresses: addresses.into_iter(),
as_listener,
})?;

Ok((id, DialingPeer { network, peer_id }))
Expand Down
7 changes: 6 additions & 1 deletion core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ pub trait Transport {
///
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
/// try an alternative [`Transport`], if available.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
fn dial(
self,
addr: Multiaddr,
// TODO: In case the transport doesn't support as_listener, should we return an error at runtime?
as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;

Expand Down
16 changes: 13 additions & 3 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,24 @@ where
Ok(stream)
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self
.transport
.dial(addr.clone())
.dial(addr.clone(), as_listener)
.map_err(|err| err.map(EitherError::A))?;
let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr })),
args: Some((
self.fun,
ConnectedPoint::Dialer {
address: addr,
as_listener,
},
)),
marker: PhantomPinned,
};
Ok(future)
Expand Down
22 changes: 17 additions & 5 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;

trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial(
&self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
}

Expand All @@ -78,8 +82,12 @@ where
Ok(Box::pin(fut))
}

fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial(self.clone(), addr)
fn dial(
&self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial(self.clone(), addr, as_listener)
.map(|r| r.map_err(box_err))
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
Expand Down Expand Up @@ -115,8 +123,12 @@ impl<O> Transport for Boxed<O> {
self.inner.listen_on(addr)
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
fn dial(
self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr, as_listener)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Expand Down
10 changes: 7 additions & 3 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,20 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial(addr) {
fn dial(
self,
addr: Multiaddr,
as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial(addr, as_listener) {
Ok(connec) => return Ok(EitherFuture::First(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
return Err(TransportError::Other(EitherError::A(err)))
}
};

let addr = match self.1.dial(addr) {
let addr = match self.1.dial(addr, as_listener) {
Ok(connec) => return Ok(EitherFuture::Second(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
Expand Down
6 changes: 5 additions & 1 deletion core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl<TOut> Transport for DummyTransport<TOut> {
Err(TransportError::MultiaddrNotSupported(addr))
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
self,
addr: Multiaddr,
_as_listener: bool,
) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

Expand Down
Loading

0 comments on commit f18a0bb

Please sign in to comment.