From a8b990c5c11dee82477e243f77f0d9211b444228 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 17 Aug 2020 16:48:12 +0200 Subject: [PATCH 1/2] Implement ProtocolsHandler methods in wrappers. This PR forwards calls to some ProtocolsHandler methods that were previously not implemented in wrappers such as `MapInEvent`. It is unclear though how this can be implemented in some handlers such as `MultiHandler` as the information at hand does not enable it to decide which handler to forward the call to. --- swarm/src/protocols_handler.rs | 26 ++++++----- swarm/src/protocols_handler/dummy.rs | 6 ++- swarm/src/protocols_handler/map_in.rs | 21 +++++---- swarm/src/protocols_handler/map_out.rs | 21 +++++---- swarm/src/protocols_handler/multi.rs | 15 ++++++- swarm/src/protocols_handler/select.rs | 60 +++++++++++++++++++++++--- swarm/src/toggle.rs | 19 +++++++- 7 files changed, 128 insertions(+), 40 deletions(-) diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index a994cadfd9e..6c638636fe5 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -156,9 +156,7 @@ pub trait ProtocolsHandler: Send + 'static { /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed. fn inject_listen_upgrade_error( &mut self, - _: ProtocolsHandlerUpgrErr< - ::Error - > + _: ProtocolsHandlerUpgrErr<::Error> ) {} /// Returns until when the connection should be kept alive. @@ -189,7 +187,6 @@ pub trait ProtocolsHandler: Send + 'static { >; /// Adds a closure that turns the input event into something else. - #[inline] fn map_in_event(self, map: TMap) -> MapInEvent where Self: Sized, @@ -199,7 +196,6 @@ pub trait ProtocolsHandler: Send + 'static { } /// Adds a closure that turns the output event into something else. - #[inline] fn map_out_event(self, map: TMap) -> MapOutEvent where Self: Sized, @@ -214,7 +210,6 @@ pub trait ProtocolsHandler: Send + 'static { /// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence, /// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned /// > handler. - #[inline] fn select(self, other: TProto2) -> ProtocolsHandlerSelect where Self: Sized, @@ -226,7 +221,6 @@ pub trait ProtocolsHandler: Send + 'static { /// exclusively. /// /// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`. - #[inline] fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder where Self: Sized, @@ -331,7 +325,6 @@ impl { /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a /// `TOutboundOpenInfo` to something else. - #[inline] pub fn map_outbound_open_info( self, map: F, @@ -353,7 +346,6 @@ impl /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) /// to something else. - #[inline] pub fn map_protocol( self, map: F, @@ -374,7 +366,6 @@ impl } /// If this is a `Custom` event, maps the content to something else. - #[inline] pub fn map_custom( self, map: F, @@ -392,7 +383,6 @@ impl } /// If this is a `Close` event, maps the content to something else. - #[inline] pub fn map_close( self, map: F, @@ -421,6 +411,20 @@ pub enum ProtocolsHandlerUpgrErr { Upgrade(UpgradeError), } +impl ProtocolsHandlerUpgrErr { + /// Map the inner [`UpgradeError`] type. + pub fn map_upgrade_err(self, f: F) -> ProtocolsHandlerUpgrErr + where + F: FnOnce(UpgradeError) -> UpgradeError + { + match self { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, + ProtocolsHandlerUpgrErr::Upgrade(e) => ProtocolsHandlerUpgrErr::Upgrade(f(e)) + } + } +} + impl fmt::Display for ProtocolsHandlerUpgrErr where TUpgrErr: fmt::Display, diff --git a/swarm/src/protocols_handler/dummy.rs b/swarm/src/protocols_handler/dummy.rs index 3cd9f7d39e6..70820c69870 100644 --- a/swarm/src/protocols_handler/dummy.rs +++ b/swarm/src/protocols_handler/dummy.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}; +use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}}; use std::task::{Context, Poll}; use void::Void; @@ -71,8 +71,12 @@ impl ProtocolsHandler for DummyProtocolsHandler { fn inject_event(&mut self, _: Self::InEvent) {} + fn inject_address_change(&mut self, _: &Multiaddr) {} + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } diff --git a/swarm/src/protocols_handler/map_in.rs b/swarm/src/protocols_handler/map_in.rs index 7a031022cee..c66187cd4a1 100644 --- a/swarm/src/protocols_handler/map_in.rs +++ b/swarm/src/protocols_handler/map_in.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; - +use libp2p_core::Multiaddr; use std::{marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. @@ -38,7 +38,6 @@ pub struct MapInEvent { impl MapInEvent { /// Creates a `MapInEvent`. - #[inline] pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self { MapInEvent { inner, @@ -62,12 +61,10 @@ where type OutboundProtocol = TProtoHandler::OutboundProtocol; type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } - #[inline] fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output @@ -75,7 +72,6 @@ where self.inner.inject_fully_negotiated_inbound(protocol) } - #[inline] fn inject_fully_negotiated_outbound( &mut self, protocol: ::Output, @@ -84,24 +80,31 @@ where self.inner.inject_fully_negotiated_outbound(protocol, info) } - #[inline] fn inject_event(&mut self, event: TNewIn) { if let Some(event) = (self.map)(event) { self.inner.inject_event(event); } } - #[inline] + fn inject_address_change(&mut self, addr: &Multiaddr) { + self.inner.inject_address_change(addr) + } + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { self.inner.inject_dial_upgrade_error(info, error) } - #[inline] + fn inject_listen_upgrade_error( + &mut self, + error: ProtocolsHandlerUpgrErr<::Error> + ) { + self.inner.inject_listen_upgrade_error(error) + } + fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } - #[inline] fn poll( &mut self, cx: &mut Context<'_>, diff --git a/swarm/src/protocols_handler/map_out.rs b/swarm/src/protocols_handler/map_out.rs index 7b59fe735a3..557625669e5 100644 --- a/swarm/src/protocols_handler/map_out.rs +++ b/swarm/src/protocols_handler/map_out.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; - +use libp2p_core::Multiaddr; use std::task::{Context, Poll}; /// Wrapper around a protocol handler that turns the output event into something else. @@ -37,7 +37,6 @@ pub struct MapOutEvent { impl MapOutEvent { /// Creates a `MapOutEvent`. - #[inline] pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self { MapOutEvent { inner, @@ -60,12 +59,10 @@ where type OutboundProtocol = TProtoHandler::OutboundProtocol; type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } - #[inline] fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output @@ -73,7 +70,6 @@ where self.inner.inject_fully_negotiated_inbound(protocol) } - #[inline] fn inject_fully_negotiated_outbound( &mut self, protocol: ::Output, @@ -82,22 +78,29 @@ where self.inner.inject_fully_negotiated_outbound(protocol, info) } - #[inline] fn inject_event(&mut self, event: Self::InEvent) { self.inner.inject_event(event) } - #[inline] + fn inject_address_change(&mut self, addr: &Multiaddr) { + self.inner.inject_address_change(addr) + } + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { self.inner.inject_dial_upgrade_error(info, error) } - #[inline] + fn inject_listen_upgrade_error( + &mut self, + error: ProtocolsHandlerUpgrErr<::Error> + ) { + self.inner.inject_listen_upgrade_error(error) + } + fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } - #[inline] fn poll( &mut self, cx: &mut Context<'_>, diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index a18e50261e2..ce054945096 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -36,7 +36,7 @@ use crate::upgrade::{ UpgradeInfoSend }; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, upgrade::ProtocolName}; use rand::Rng; use std::{ collections::{HashMap, HashSet}, @@ -135,6 +135,12 @@ where } } + fn inject_address_change(&mut self, addr: &Multiaddr) { + for h in self.handlers.values_mut() { + h.inject_address_change(addr) + } + } + fn inject_dial_upgrade_error ( &mut self, (key, arg): Self::OutboundOpenInfo, @@ -147,6 +153,13 @@ where } } + fn inject_listen_upgrade_error( + &mut self, + _: ProtocolsHandlerUpgrErr<::Error> + ) { + // TODO: ??? + } + fn connection_keep_alive(&self) -> KeepAlive { self.handlers.values() .map(|h| h.connection_keep_alive()) diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index 32cd9ae3c73..7561e0349d4 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -30,9 +30,10 @@ use crate::protocols_handler::{ use libp2p_core::{ ConnectedPoint, + Multiaddr, PeerId, either::{EitherError, EitherOutput}, - upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError} + upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError, NegotiationError, ProtocolError} }; use std::{cmp, task::Context, task::Poll}; @@ -47,7 +48,6 @@ pub struct IntoProtocolsHandlerSelect { impl IntoProtocolsHandlerSelect { /// Builds a `IntoProtocolsHandlerSelect`. - #[inline] pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { IntoProtocolsHandlerSelect { proto1, @@ -86,7 +86,6 @@ pub struct ProtocolsHandlerSelect { impl ProtocolsHandlerSelect { /// Builds a `ProtocolsHandlerSelect`. - #[inline] pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { ProtocolsHandlerSelect { proto1, @@ -107,7 +106,6 @@ where type OutboundProtocol = EitherUpgrade, SendWrapper>; type OutboundOpenInfo = EitherOutput; - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { let proto1 = self.proto1.listen_protocol(); let proto2 = self.proto2.listen_protocol(); @@ -138,7 +136,6 @@ where } } - #[inline] fn inject_event(&mut self, event: Self::InEvent) { match event { EitherOutput::First(event) => self.proto1.inject_event(event), @@ -146,7 +143,11 @@ where } } - #[inline] + fn inject_address_change(&mut self, addr: &Multiaddr) { + self.proto1.inject_address_change(addr); + self.proto2.inject_address_change(addr) + } + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { match (info, error) { (EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => { @@ -182,7 +183,52 @@ where } } - #[inline] + fn inject_listen_upgrade_error(&mut self, error: ProtocolsHandlerUpgrErr<::Error>) { + match error { + ProtocolsHandlerUpgrErr::Timer => { + self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer); + self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer); + } + ProtocolsHandlerUpgrErr::Timeout => { + self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout); + self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); + self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => { + let (e1, e2); + match e { + ProtocolError::IoError(e) => { + e1 = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into())); + e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e)) + } + ProtocolError::InvalidMessage => { + e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); + e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage) + } + ProtocolError::InvalidProtocol => { + e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); + e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol) + } + ProtocolError::TooManyProtocols => { + e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); + e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) + } + } + self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1))); + self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2))) + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { + self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + } + } + } + fn connection_keep_alive(&self) -> KeepAlive { cmp::max(self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive()) } diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index 2dbd5268dd5..c534cab532b 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -28,13 +28,12 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr, IntoProtocolsHandler }; - use libp2p_core::{ ConnectedPoint, PeerId, Multiaddr, connection::ConnectionId, - either::EitherOutput, + either::{EitherError, EitherOutput}, upgrade::{DeniedUpgrade, EitherUpgrade} }; use std::{error, task::Context, task::Poll}; @@ -250,11 +249,27 @@ where .inject_event(event) } + fn inject_address_change(&mut self, addr: &Multiaddr) { + if let Some(inner) = self.inner.as_mut() { + inner.inject_address_change(addr) + } + } + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<::Error>) { self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED") .inject_dial_upgrade_error(info, err) } + fn inject_listen_upgrade_error(&mut self, err: ProtocolsHandlerUpgrErr<::Error>) { + if let Some(inner) = self.inner.as_mut() { + let err = err.map_upgrade_err(|e| e.map_err(|e| match e { + EitherError::A(e) => e, + EitherError::B(v) => void::unreachable(v) + })); + inner.inject_listen_upgrade_error(err) + } + } + fn connection_keep_alive(&self) -> KeepAlive { self.inner.as_ref().map(|h| h.connection_keep_alive()) .unwrap_or(KeepAlive::No) From b132ab5e678012c8d79ae12ba47c8ec100361748 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 18 Aug 2020 15:50:00 +0200 Subject: [PATCH 2/2] Add `MultiHandler::inject_listen_ugrade_error`. --- swarm/src/protocols_handler/multi.rs | 47 ++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index ce054945096..6177ed2fb96 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -36,7 +36,8 @@ use crate::upgrade::{ UpgradeInfoSend }; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, upgrade::ProtocolName}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError}; use rand::Rng; use std::{ collections::{HashMap, HashSet}, @@ -155,9 +156,49 @@ where fn inject_listen_upgrade_error( &mut self, - _: ProtocolsHandlerUpgrErr<::Error> + error: ProtocolsHandlerUpgrErr<::Error> ) { - // TODO: ??? + match error { + ProtocolsHandlerUpgrErr::Timer => + for h in self.handlers.values_mut() { + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer) + } + ProtocolsHandlerUpgrErr::Timeout => + for h in self.handlers.values_mut() { + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout) + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => + for h in self.handlers.values_mut() { + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))) + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => + match e { + ProtocolError::IoError(e) => + for h in self.handlers.values_mut() { + let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into())); + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } + ProtocolError::InvalidMessage => + for h in self.handlers.values_mut() { + let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } + ProtocolError::InvalidProtocol => + for h in self.handlers.values_mut() { + let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } + ProtocolError::TooManyProtocols => + for h in self.handlers.values_mut() { + let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => + if let Some(h) = self.handlers.get_mut(&k) { + h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + } + } } fn connection_keep_alive(&self) -> KeepAlive {