diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 2ea16ef180c..ad5418193a4 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -1,5 +1,3 @@ -use async_std::io; -use either::Either; use futures::channel::{mpsc, oneshot}; use futures::prelude::*; @@ -208,10 +206,7 @@ impl EventLoop { } } - async fn handle_event( - &mut self, - event: SwarmEvent>, - ) { + async fn handle_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(BehaviourEvent::Kademlia( kad::Event::OutboundQueryProgressed { diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 4dac6ea6774..b1d4e9f0c89 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -123,8 +123,8 @@ impl super::Recorder for Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { if let libp2p_swarm::SwarmEvent::ConnectionClosed { peer_id, num_established, diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 2132dd5d7fb..97968253faa 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -138,8 +138,8 @@ impl Recorder for Metrics { } } -impl Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { self.swarm.record(event); #[cfg(feature = "identify")] diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index fff28e5f639..ad83401f316 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -185,8 +185,8 @@ impl Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &SwarmEvent) { match event { SwarmEvent::Behaviour(_) => {} SwarmEvent::ConnectionEstablished { @@ -359,15 +359,13 @@ struct ConnectionClosedLabels { enum ConnectionError { Io, KeepAliveTimeout, - Handler, } -impl From<&libp2p_swarm::ConnectionError> for ConnectionError { - fn from(value: &libp2p_swarm::ConnectionError) -> Self { +impl From<&libp2p_swarm::ConnectionError> for ConnectionError { + fn from(value: &libp2p_swarm::ConnectionError) -> Self { match value { libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io, libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout, - libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler, } } } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 4d9bf3c910d..eba58f89313 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -40,7 +40,6 @@ use std::collections::VecDeque; use std::io; use std::task::{Context, Poll}; use std::time::Duration; -use void::Void; #[derive(Debug)] pub enum Command { @@ -63,7 +62,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::ToBehaviour, - ::Error, >, >, @@ -182,7 +180,6 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type Error = Void; type InboundProtocol = Either, DeniedUpgrade>; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); @@ -229,12 +226,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { // Return queued events. if let Some(event) = self.queued_events.pop_front() { diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 55480384ffa..63ef96781d9 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -38,7 +38,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use void::Void; /// The event emitted by the Handler. This informs the behaviour of various events created /// by the handler. @@ -220,7 +219,6 @@ impl EnabledHandler { ::OutboundProtocol, ::OutboundOpenInfo, ::ToBehaviour, - ::Error, >, > { if !self.peer_kind_sent { @@ -391,7 +389,6 @@ impl EnabledHandler { impl ConnectionHandler for Handler { type FromBehaviour = HandlerIn; type ToBehaviour = HandlerEvent; - type Error = Void; type InboundOpenInfo = (); type InboundProtocol = either::Either; type OutboundOpenInfo = (); @@ -434,12 +431,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { match self { Handler::Enabled(handler) => handler.poll(cx), diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 5012868c17a..f9b77e0b63a 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -38,7 +38,7 @@ use libp2p_swarm::{ }; use smallvec::SmallVec; use std::collections::HashSet; -use std::{io, task::Context, task::Poll, time::Duration}; +use std::{task::Context, task::Poll, time::Duration}; use tracing::Level; const STREAM_TIMEOUT: Duration = Duration::from_secs(60); @@ -57,7 +57,6 @@ pub struct Handler { Either, ReadyUpgrade>, (), Event, - io::Error, >; 4], >, @@ -282,7 +281,6 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = InEvent; type ToBehaviour = Event; - type Error = io::Error; type InboundProtocol = SelectUpgrade, ReadyUpgrade>; type OutboundProtocol = Either, ReadyUpgrade>; @@ -320,9 +318,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent, - > { + ) -> Poll> { if let Some(event) = self.events.pop() { return Poll::Ready(event); } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 21dad8a82b9..adfb076541c 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -597,7 +597,6 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = HandlerIn; type ToBehaviour = HandlerEvent; - type Error = io::Error; // TODO: better error type? type InboundProtocol = Either; type OutboundProtocol = ProtocolConfig; type OutboundOpenInfo = (); @@ -711,12 +710,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { match &mut self.protocol_status { Some(status) if !status.reported => { @@ -846,7 +840,7 @@ impl Handler { } impl futures::Stream for OutboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -978,7 +972,7 @@ impl futures::Stream for OutboundSubstreamState { } impl futures::Stream for InboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 0cc2dd23abe..2a2c5499fc2 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -35,7 +35,6 @@ use libp2p_swarm::{ }, ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, }; -use void::Void; use crate::client::{RunError, RunId}; use crate::{RunParams, RunUpdate}; @@ -59,7 +58,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::ToBehaviour, - ::Error, >, >, @@ -87,7 +85,6 @@ impl Default for Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); @@ -159,12 +156,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index ed42162cb7e..ddfe8f881e5 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -63,7 +63,6 @@ impl Default for Handler { impl ConnectionHandler for Handler { type FromBehaviour = Void; type ToBehaviour = Event; - type Error = Void; type InboundProtocol = ReadyUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; @@ -121,12 +120,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { loop { match self.inbound.poll_unpin(cx) { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 3ee6bfdf5d6..5e6fc2cd2cf 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -209,7 +209,6 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = Void; type ToBehaviour = Result; - type Error = Void; type InboundProtocol = ReadyUpgrade; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); @@ -225,14 +224,8 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - ReadyUpgrade, - (), - Result, - Self::Error, - >, - > { + ) -> Poll, (), Result>> + { match self.state { State::Inactive { reported: true } => { return Poll::Pending; // nothing to do on this connection diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 4e729b1993e..958c6a9b906 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -339,7 +339,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::ToBehaviour, - ::Error, >, >, @@ -482,7 +481,6 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type Error = void::Void; type InboundProtocol = ReadyUpgrade; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; @@ -593,12 +591,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { // Return queued events. if let Some(event) = self.queued_events.pop_front() { diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index f30f24a949b..1d24493be77 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -101,7 +101,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::ToBehaviour, - ::Error, >, >, @@ -230,7 +229,6 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type Error = void::Void; type InboundProtocol = ReadyUpgrade; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; @@ -275,12 +273,7 @@ impl ConnectionHandler for Handler { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { loop { debug_assert_eq!( diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 9ccbc49fc4b..2d45e0d7dc3 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -367,7 +367,6 @@ where { type FromBehaviour = OutboundMessage; type ToBehaviour = Event; - type Error = void::Void; type InboundProtocol = Protocol; type OutboundProtocol = Protocol; type OutboundOpenInfo = (); @@ -390,8 +389,7 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), Self::ToBehaviour, Self::Error>> - { + ) -> Poll, (), Self::ToBehaviour>> { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index ee4058d530d..48f5bcbf4ef 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -27,9 +27,7 @@ use libp2p_core::{ use libp2p_identity::{Keypair, PeerId}; use libp2p_plaintext as plaintext; use libp2p_swarm::dial_opts::PeerCondition; -use libp2p_swarm::{ - self as swarm, dial_opts::DialOpts, NetworkBehaviour, Swarm, SwarmEvent, THandlerErr, -}; +use libp2p_swarm::{self as swarm, dial_opts::DialOpts, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p_yamux as yamux; use std::fmt::Debug; use std::future::IntoFuture; @@ -70,9 +68,7 @@ pub trait SwarmExt { /// Wait for specified condition to return `Some`. async fn wait(&mut self, predicate: P) -> E where - P: Fn( - SwarmEvent<::ToSwarm, THandlerErr>, - ) -> Option, + P: Fn(SwarmEvent<::ToSwarm>) -> Option, P: Send; /// Listens for incoming connections, polling the [`Swarm`] until the transport is ready to accept connections. @@ -83,9 +79,7 @@ pub trait SwarmExt { /// Returns the next [`SwarmEvent`] or times out after 10 seconds. /// /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`. - async fn next_swarm_event( - &mut self, - ) -> SwarmEvent<::ToSwarm, THandlerErr>; + async fn next_swarm_event(&mut self) -> SwarmEvent<::ToSwarm>; /// Returns the next behaviour event or times out after 10 seconds. /// @@ -142,8 +136,8 @@ where TBehaviour2::ToSwarm: Debug, TBehaviour1: NetworkBehaviour + Send, TBehaviour1::ToSwarm: Debug, - SwarmEvent>: TryIntoOutput, - SwarmEvent>: TryIntoOutput, + SwarmEvent: TryIntoOutput, + SwarmEvent: TryIntoOutput, Out1: Debug, Out2: Debug, { @@ -185,15 +179,15 @@ pub trait TryIntoOutput: Sized { fn try_into_output(self) -> Result; } -impl TryIntoOutput for SwarmEvent { +impl TryIntoOutput for SwarmEvent { fn try_into_output(self) -> Result { self.try_into_behaviour_event() } } -impl TryIntoOutput> - for SwarmEvent +impl TryIntoOutput> + for SwarmEvent { - fn try_into_output(self) -> Result, Self> { + fn try_into_output(self) -> Result, Self> { Ok(self) } } @@ -295,7 +289,7 @@ where async fn wait(&mut self, predicate: P) -> E where - P: Fn(SwarmEvent<::ToSwarm, THandlerErr>) -> Option, + P: Fn(SwarmEvent<::ToSwarm>) -> Option, P: Send, { loop { @@ -314,9 +308,7 @@ where } } - async fn next_swarm_event( - &mut self, - ) -> SwarmEvent<::ToSwarm, THandlerErr> { + async fn next_swarm_event(&mut self) -> SwarmEvent<::ToSwarm> { match futures::future::select( futures_timer::Delay::new(Duration::from_secs(10)), self.select_next_some(), diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 717674c9dd1..692ce9d1bde 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -7,6 +7,9 @@ See [PR 4076](https://github.com/libp2p/rust-libp2p/pull/4076). - Remove deprecated `PollParameters` from `NetworkBehaviour::poll` function. See [PR 4490](https://github.com/libp2p/rust-libp2p/pull/4490). +- Remove deprecated `ConnectionHandlerEvent::Close` and `ConnectionHandler::Error`. + `ConnectionHandler`s should not close connections directly as the connection might still be in use by other handlers. + See [PR 4755](https://github.com/libp2p/rust-libp2p/pull/4755). - Add `PeerCondition::DisconnectedAndNotDialing` variant, combining pre-existing conditions. This is the new default. A new dialing attempt is iniated _only if_ the peer is both considered disconnected and there is currently no ongoing dialing attempt. diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 7fbb72b9260..c25b14e75e3 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -290,16 +290,13 @@ pub enum ToSwarm { /// This address will be shared with all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrExpired`]. ExternalAddrExpired(Multiaddr), - /// Instructs the `Swarm` to initiate a graceful close of one or all connections - /// with the given peer. + /// Instructs the `Swarm` to initiate a graceful close of one or all connections with the given peer. /// - /// Note: Closing a connection via - /// [`ToSwarm::CloseConnection`] does not inform the - /// corresponding [`ConnectionHandler`]. - /// Closing a connection via a [`ConnectionHandler`] can be done - /// either in a collaborative manner across [`ConnectionHandler`]s - /// with [`ConnectionHandler::connection_keep_alive`] or directly with - /// [`ConnectionHandlerEvent::Close`](crate::ConnectionHandlerEvent::Close). + /// Closing a connection via [`ToSwarm::CloseConnection`] will poll [`ConnectionHandler::poll_close`] to completion. + /// In most cases, stopping to "use" a connection is enough to have it closed. + /// The keep-alive algorithm will close a connection automatically once all [`ConnectionHandler`]s are idle. + /// + /// Use this command if you want to close a connection _despite_ it still being in use by one or more handlers. CloseConnection { /// The peer to disconnect. peer_id: PeerId, diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index e1da71a0450..5c23ee099a3 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -264,7 +264,6 @@ where { type FromBehaviour = TInner::FromBehaviour; type ToBehaviour = TInner::ToBehaviour; - type Error = TInner::Error; type InboundProtocol = Either, SendWrapper>; type OutboundProtocol = TInner::OutboundProtocol; type OutboundOpenInfo = TInner::OutboundOpenInfo; @@ -299,12 +298,7 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { if let Some(inner) = self.inner.as_mut() { inner.poll(cx) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 35cc71d5354..15c49bb7bd5 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -238,7 +238,7 @@ where pub(crate) fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, ConnectionError>> { + ) -> Poll, ConnectionError>> { let Self { requested_substreams, muxing, @@ -283,9 +283,6 @@ where Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } - Poll::Ready(ConnectionHandlerEvent::Close(err)) => { - return Poll::Ready(Err(ConnectionError::Handler(err))); - } Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols( ProtocolSupport::Added(protocols), )) => { @@ -452,9 +449,7 @@ where } #[cfg(test)] - fn poll_noop_waker( - &mut self, - ) -> Poll, ConnectionError>> { + fn poll_noop_waker(&mut self) -> Poll, ConnectionError>> { Pin::new(self).poll(&mut Context::from_waker(futures::task::noop_waker_ref())) } } @@ -1112,7 +1107,7 @@ mod tests { #[derive(Default)] struct ConfigurableProtocolConnectionHandler { - events: Vec>, + events: Vec>, active_protocols: HashSet, local_added: Vec>, local_removed: Vec>, @@ -1147,7 +1142,6 @@ mod tests { impl ConnectionHandler for MockConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -1203,7 +1197,6 @@ mod tests { Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour, - Self::Error, >, > { if self.outbound_requested { @@ -1221,7 +1214,6 @@ mod tests { impl ConnectionHandler for ConfigurableProtocolConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type Error = Void; type InboundProtocol = ManyProtocolsUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -1280,7 +1272,6 @@ mod tests { Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour, - Self::Error, >, > { if let Some(event) = self.events.pop() { diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 5d5dda57868..33aa81c19a9 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -25,47 +25,36 @@ use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. #[derive(Debug)] -pub enum ConnectionError { +pub enum ConnectionError { /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), /// The connection keep-alive timeout expired. KeepAliveTimeout, - - /// The connection handler produced an error. - Handler(THandlerErr), } -impl fmt::Display for ConnectionError -where - THandlerErr: fmt::Display, -{ +impl fmt::Display for ConnectionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {err}"), ConnectionError::KeepAliveTimeout => { write!(f, "Connection closed due to expired keep-alive timeout.") } - ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {err}"), } } } -impl std::error::Error for ConnectionError -where - THandlerErr: std::error::Error + 'static, -{ +impl std::error::Error for ConnectionError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { ConnectionError::IO(err) => Some(err), ConnectionError::KeepAliveTimeout => None, - ConnectionError::Handler(err) => Some(err), } } } -impl From for ConnectionError { +impl From for ConnectionError { fn from(error: io::Error) -> Self { ConnectionError::IO(error) } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index cfa3fb7ea3c..9bcd1b446d3 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -132,7 +132,7 @@ where /// Receivers for events reported from established connections. established_connection_events: - SelectAll>>, + SelectAll>>, /// Receivers for [`NewConnection`] objects that are dropped. new_connection_dropped_listeners: FuturesUnordered>, @@ -226,7 +226,7 @@ impl fmt::Debug for Pool { /// Event that can happen on the `Pool`. #[derive(Debug)] -pub(crate) enum PoolEvent { +pub(crate) enum PoolEvent { /// A new connection has been established. ConnectionEstablished { id: ConnectionId, @@ -258,7 +258,7 @@ pub(crate) enum PoolEvent { connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option>, + error: Option, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, }, @@ -290,7 +290,7 @@ pub(crate) enum PoolEvent { id: ConnectionId, peer_id: PeerId, /// The produced event. - event: THandler::ToBehaviour, + event: ToBehaviour, }, /// The connection to a node has changed its address. @@ -548,7 +548,7 @@ where /// Polls the connection pool for events. #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))] - pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where THandler: ConnectionHandler + 'static, ::OutboundOpenInfo: Send, diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index f2c6928cd27..08674fd2ee5 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -66,7 +66,7 @@ pub(crate) enum PendingConnectionEvent { } #[derive(Debug)] -pub(crate) enum EstablishedConnectionEvent { +pub(crate) enum EstablishedConnectionEvent { /// A node we are connected to has changed its address. AddressChange { id: ConnectionId, @@ -77,7 +77,7 @@ pub(crate) enum EstablishedConnectionEvent { Notify { id: ConnectionId, peer_id: PeerId, - event: THandler::ToBehaviour, + event: ToBehaviour, }, /// A connection closed, possibly due to an error. /// @@ -86,7 +86,7 @@ pub(crate) enum EstablishedConnectionEvent { Closed { id: ConnectionId, peer_id: PeerId, - error: Option>, + error: Option, }, } @@ -171,7 +171,7 @@ pub(crate) async fn new_for_established_connection( peer_id: PeerId, mut connection: crate::connection::Connection, mut command_receiver: mpsc::Receiver>, - mut events: mpsc::Sender>, + mut events: mpsc::Sender>, ) where THandler: ConnectionHandler, { diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index c3e9c22a422..86df676443b 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -64,7 +64,6 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -82,12 +81,7 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { Poll::Pending } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index f0a46129250..be0ca67ab48 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -102,8 +102,6 @@ pub trait ConnectionHandler: Send + 'static { type FromBehaviour: fmt::Debug + Send + 'static; /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`]. type ToBehaviour: fmt::Debug + Send + 'static; - /// The type of errors returned by [`ConnectionHandler::poll`]. - type Error: error::Error + fmt::Debug + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. type InboundProtocol: InboundUpgradeSend; /// The outbound upgrade for the protocol(s) used by the handler. @@ -149,12 +147,7 @@ pub trait ConnectionHandler: Send + 'static { &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, >; /// Gracefully close the [`ConnectionHandler`]. @@ -541,21 +534,12 @@ impl SubstreamProtocol { /// Event produced by a handler. #[derive(Debug, Clone, PartialEq, Eq)] #[non_exhaustive] -pub enum ConnectionHandlerEvent { +pub enum ConnectionHandlerEvent { /// Request a new outbound substream to be opened with the remote. OutboundSubstreamRequest { /// The protocol(s) to apply on the substream. protocol: SubstreamProtocol, }, - - /// Close the connection for the given reason. - /// - /// Note this will affect all [`ConnectionHandler`]s handling this - /// connection, in other words it will close the connection for all - /// [`ConnectionHandler`]s. To signal that one has no more need for the - /// connection, while allowing other [`ConnectionHandler`]s to continue using - /// the connection, return false in [`ConnectionHandler::connection_keep_alive`]. - Close(TErr), /// We learned something about the protocols supported by the remote. ReportRemoteProtocols(ProtocolSupport), @@ -572,15 +556,15 @@ pub enum ProtocolSupport { } /// Event produced by a handler. -impl - ConnectionHandlerEvent +impl + ConnectionHandlerEvent { /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a /// `TOutboundOpenInfo` to something else. pub fn map_outbound_open_info( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TOutboundOpenInfo) -> I, { @@ -593,7 +577,6 @@ impl ConnectionHandlerEvent::NotifyBehaviour(val) => { ConnectionHandlerEvent::NotifyBehaviour(val) } - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) } @@ -602,10 +585,7 @@ impl /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) /// to something else. - pub fn map_protocol( - self, - map: F, - ) -> ConnectionHandlerEvent + pub fn map_protocol(self, map: F) -> ConnectionHandlerEvent where F: FnOnce(TConnectionUpgrade) -> I, { @@ -618,7 +598,6 @@ impl ConnectionHandlerEvent::NotifyBehaviour(val) => { ConnectionHandlerEvent::NotifyBehaviour(val) } - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) } @@ -629,7 +608,7 @@ impl pub fn map_custom( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TCustom) -> I, { @@ -640,29 +619,6 @@ impl ConnectionHandlerEvent::NotifyBehaviour(val) => { ConnectionHandlerEvent::NotifyBehaviour(map(val)) } - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), - ConnectionHandlerEvent::ReportRemoteProtocols(support) => { - ConnectionHandlerEvent::ReportRemoteProtocols(support) - } - } - } - - /// If this is a `Close` event, maps the content to something else. - pub fn map_close( - self, - map: F, - ) -> ConnectionHandlerEvent - where - F: FnOnce(TErr) -> I, - { - match self { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } - } - ConnectionHandlerEvent::NotifyBehaviour(val) => { - ConnectionHandlerEvent::NotifyBehaviour(val) - } - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) } diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 093900135b8..b48b7cdcb15 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -80,7 +80,6 @@ where { type FromBehaviour = Either; type ToBehaviour = Either; - type Error = Either; type InboundProtocol = Either, SendWrapper>; type OutboundProtocol = Either, SendWrapper>; @@ -119,22 +118,15 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { let event = match self { Either::Left(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Left) - .map_close(Either::Left) .map_protocol(|p| Either::Left(SendWrapper(p))) .map_outbound_open_info(Either::Left), Either::Right(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Right) - .map_close(Either::Right) .map_protocol(|p| Either::Right(SendWrapper(p))) .map_outbound_open_info(Either::Right), }; diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index e3458eb5451..bd45eee4d97 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -52,7 +52,6 @@ where { type FromBehaviour = TNewIn; type ToBehaviour = TConnectionHandler::ToBehaviour; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -76,12 +75,7 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { self.inner.poll(cx) } diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index cc06a4c50c8..8ef8bad61b3 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -47,7 +47,6 @@ where { type FromBehaviour = TConnectionHandler::FromBehaviour; type ToBehaviour = TNewOut; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -69,18 +68,12 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { self.inner.poll(cx).map(|ev| match ev { ConnectionHandlerEvent::NotifyBehaviour(ev) => { ConnectionHandlerEvent::NotifyBehaviour((self.map)(ev)) } - ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err), ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } } diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 89d4d36fadc..fc1cd750763 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -111,7 +111,6 @@ where { type FromBehaviour = (K, ::FromBehaviour); type ToBehaviour = (K, ::ToBehaviour); - type Error = ::Error; type InboundProtocol = Upgrade::InboundProtocol>; type OutboundProtocol = ::OutboundProtocol; type InboundOpenInfo = Info::InboundOpenInfo>; @@ -241,12 +240,7 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { // Calling `gen_range(0, 0)` (see below) would panic, so we have return early to avoid // that situation. diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index a611bc5073c..b1fc41e9098 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -115,7 +115,6 @@ where { type FromBehaviour = TOutbound; type ToBehaviour = Result>; - type Error = void::Void; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); @@ -133,12 +132,7 @@ where &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { if !self.events_out.is_empty() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 90e6522404e..23b9adcfd90 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -42,7 +42,6 @@ impl PendingConnectionHandler { impl ConnectionHandler for PendingConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type Error = Void; type InboundProtocol = PendingUpgrade; type OutboundProtocol = PendingUpgrade; type OutboundOpenInfo = Void; @@ -60,12 +59,7 @@ impl ConnectionHandler for PendingConnectionHandler { &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { Poll::Pending } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 957ba43fbe7..fc470ff803e 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -181,7 +181,6 @@ where { type FromBehaviour = Either; type ToBehaviour = Either; - type Error = Either; type InboundProtocol = SelectUpgrade< SendWrapper<::InboundProtocol>, SendWrapper<::InboundProtocol>, @@ -219,20 +218,12 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { match self.proto1.poll(cx) { Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Left(event))); } - Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event))); - } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol @@ -252,9 +243,6 @@ where event, ))); } - Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event))); - } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: protocol diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 1aa6bfb30cb..462dc718b86 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -167,13 +167,10 @@ pub type THandlerInEvent = as ConnectionHandle /// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. pub type THandlerOutEvent = as ConnectionHandler>::ToBehaviour; -/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -pub type THandlerErr = as ConnectionHandler>::Error; - /// Event generated by the `Swarm`. #[derive(Debug)] #[non_exhaustive] -pub enum SwarmEvent { +pub enum SwarmEvent { /// Event generated by the `NetworkBehaviour`. Behaviour(TBehaviourOutEvent), /// A connection to the given peer has been opened. @@ -207,7 +204,7 @@ pub enum SwarmEvent { num_established: u32, /// Reason for the disconnection, if it was not a successful /// active close. - cause: Option>, + cause: Option, }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// @@ -304,7 +301,7 @@ pub enum SwarmEvent { ExternalAddrExpired { address: Multiaddr }, } -impl SwarmEvent { +impl SwarmEvent { /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour` variant, otherwise fail. #[allow(clippy::result_large_err)] pub fn try_into_behaviour_event(self) -> Result { @@ -349,7 +346,7 @@ where /// can be polled again. pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, - pending_swarm_events: VecDeque>>, + pending_swarm_events: VecDeque>, } impl Unpin for Swarm where TBehaviour: NetworkBehaviour {} @@ -631,12 +628,8 @@ where /// /// Returns `Ok(())` if there was one or more established connections to the peer. /// - /// Note: Closing a connection via [`Swarm::disconnect_peer_id`] does - /// not inform the corresponding [`ConnectionHandler`]. - /// Closing a connection via a [`ConnectionHandler`] can be done either in a - /// collaborative manner across [`ConnectionHandler`]s - /// with [`ConnectionHandler::connection_keep_alive`] or directly with - /// [`ConnectionHandlerEvent::Close`]. + /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll [`ConnectionHandler::poll_close`] to completion. + /// Use this function if you want to close a connection _despite_ it still being in use by one or more handlers. #[allow(clippy::result_unit_err)] pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> { let was_connected = self.pool.is_connected(peer_id); @@ -687,7 +680,7 @@ where &mut self.behaviour } - fn handle_pool_event(&mut self, event: PoolEvent>) { + fn handle_pool_event(&mut self, event: PoolEvent>) { match event { PoolEvent::ConnectionEstablished { peer_id, @@ -1194,7 +1187,7 @@ where fn poll_next_event( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll> { // We use a `this` variable because the compiler can't mutably borrow multiple times // across a `Deref`. let this = &mut *self; @@ -1369,7 +1362,7 @@ impl futures::Stream for Swarm where TBehaviour: NetworkBehaviour, { - type Item = SwarmEvent, THandlerErr>; + type Item = SwarmEvent>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().poll_next_event(cx).map(Some) diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index 305e33c1804..4efe8d17e49 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -96,7 +96,6 @@ impl NetworkBehaviour for Behaviour { impl ConnectionHandler for HandlerWithState { type FromBehaviour = Void; type ToBehaviour = u64; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -114,12 +113,7 @@ impl ConnectionHandler for HandlerWithState { &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { Poll::Pending }