Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): make stream uprade errors more ergonomic #3882

Merged
merged 41 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7443d0a
Don't report inbound errors on stream upgrades to handler
thomaseizinger Mar 13, 2023
0a29bae
Don't report timeout
thomaseizinger Mar 13, 2023
fad8b5b
Remove unnecessary comment
thomaseizinger Mar 13, 2023
04bee57
Reorder code
thomaseizinger Mar 13, 2023
dfa099c
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Mar 15, 2023
982718c
Log timeout of inbound stream
thomaseizinger Mar 15, 2023
931d4d1
Remove `ConnectionHandlerUpgrErr::Timer` variant
thomaseizinger Mar 15, 2023
76e0ac7
Remove `libpp2::relay::Event::CircuitReqReceiveFailed`
thomaseizinger Mar 15, 2023
b38841f
Remove `libp2p::relay::Event::InboundCircuitReqFailed`
thomaseizinger Mar 15, 2023
cea2a3d
Remove `libp2p_request_response::InboundFailure::UnsupportedProtocols`
thomaseizinger Mar 15, 2023
27bb8ec
Add unreleased tag
thomaseizinger Mar 15, 2023
1026e4b
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Mar 21, 2023
4010813
Add changelog entry
thomaseizinger Mar 21, 2023
f433dc7
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Mar 22, 2023
6d43cff
Fix perf protocol
thomaseizinger Mar 22, 2023
6248fd2
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Mar 24, 2023
40a3d3d
Remove wrong changelog entry
thomaseizinger Mar 24, 2023
ff67637
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Mar 29, 2023
c1d2c37
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Apr 3, 2023
028f37b
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Apr 11, 2023
e0ad03b
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Apr 12, 2023
c0363dd
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger Apr 16, 2023
9a3e6bf
Merge branch 'feat/no-report-inbound-error' of github.com:libp2p/rust…
thomaseizinger Apr 16, 2023
7f47dda
Fix compile errors
thomaseizinger Apr 16, 2023
7ff7f85
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger May 1, 2023
8a5274b
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger May 2, 2023
165e4be
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger May 2, 2023
a2c5944
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger May 4, 2023
5c89fd7
Fix compile errors
thomaseizinger May 4, 2023
956b814
Update changelog
thomaseizinger May 4, 2023
d53aca9
Merge branch 'master' into feat/no-report-inbound-error
thomaseizinger May 5, 2023
5947b19
Remove duplication
thomaseizinger May 5, 2023
bfea0b0
Collapse match arms
thomaseizinger May 5, 2023
9c9eaaa
Flatten `ConnectionHandlerUpgrErr`
thomaseizinger May 5, 2023
b5d3fd5
Deprecate `ConnectionHandlerUpgrErr` in favor of `StreamUpgradeError`
thomaseizinger May 5, 2023
d84a0f5
Update swarm/CHANGELOG.md
thomaseizinger May 5, 2023
6ee31f1
Merge branch 'master' into 3759-encapsulate-multistream-select
thomaseizinger May 8, 2023
e975725
Update swarm/CHANGELOG.md
thomaseizinger May 8, 2023
4227abb
Merge branch '3759-encapsulate-multistream-select' of github.com:libp…
thomaseizinger May 8, 2023
ba88222
Merge branch 'master' into 3759-encapsulate-multistream-select
thomaseizinger May 8, 2023
a46084b
Sort and deduplicate links
thomaseizinger May 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libp2p::{
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent},
swarm::{NetworkBehaviour, StreamUpgradeError, Swarm, SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
};

Expand Down Expand Up @@ -216,7 +216,7 @@ impl EventLoop {

async fn handle_event(
&mut self,
event: SwarmEvent<ComposedEvent, Either<ConnectionHandlerUpgrErr<io::Error>, io::Error>>,
event: SwarmEvent<ComposedEvent, Either<StreamUpgradeError<io::Error>, io::Error>>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
Expand Down
4 changes: 0 additions & 4 deletions misc/metrics/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ enum EventType {
ReservationReqDenied,
ReservationReqDenyFailed,
ReservationTimedOut,
CircuitReqReceiveFailed,
CircuitReqDenied,
CircuitReqDenyFailed,
CircuitReqOutboundConnectFailed,
Expand All @@ -75,9 +74,6 @@ impl From<&libp2p_relay::Event> for EventType {
EventType::ReservationReqDenyFailed
}
libp2p_relay::Event::ReservationTimedOut { .. } => EventType::ReservationTimedOut,
libp2p_relay::Event::CircuitReqReceiveFailed { .. } => {
EventType::CircuitReqReceiveFailed
}
libp2p_relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied,
libp2p_relay::Event::CircuitReqOutboundConnectFailed { .. } => {
EventType::CircuitReqOutboundConnectFailed
Expand Down
4 changes: 2 additions & 2 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
use libp2p_swarm::{
ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters,
ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError,
THandlerInEvent, ToSwarm,
};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub enum Error {
#[error("Failed to dial peer.")]
Dial,
#[error("Failed to establish substream: {0}.")]
Handler(ConnectionHandlerUpgrErr<Void>),
Handler(StreamUpgradeError<Void>),
}

pub struct Behaviour {
Expand Down
5 changes: 2 additions & 3 deletions protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
};
use std::task::{Context, Poll};
use void::Void;
Expand All @@ -42,7 +41,7 @@ pub struct Handler {
impl ConnectionHandler for Handler {
type InEvent = void::Void;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<std::io::Error>;
type Error = StreamUpgradeError<std::io::Error>;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
Expand Down
74 changes: 18 additions & 56 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use futures::future;
use futures::future::{BoxFuture, FutureExt};
use instant::Instant;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_core::ConnectedPoint;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
};
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -82,11 +81,11 @@ pub enum Event {
remote_addr: Multiaddr,
},
InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
error: StreamUpgradeError<void::Void>,
},
InboundConnectNegotiated(Vec<Multiaddr>),
OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
error: StreamUpgradeError<void::Void>,
},
OutboundConnectNegotiated {
remote_addrs: Vec<Multiaddr>,
Expand Down Expand Up @@ -127,7 +126,7 @@ pub struct Handler {
endpoint: ConnectedPoint,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<
ConnectionHandlerUpgrErr<
StreamUpgradeError<
Either<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>,
>,
Expand Down Expand Up @@ -212,45 +211,10 @@ impl Handler {
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
ConnectionHandlerUpgrErr::Timer => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timer,
},
));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.keep_alive = KeepAlive::No;
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| {
e.map_err(|e| match e {
Either::Left(e) => Either::Left(e),
Either::Right(v) => void::unreachable(v),
})
}));
}
}
self.pending_error = Some(StreamUpgradeError::Apply(match error {
Either::Left(e) => Either::Left(e),
Either::Right(v) => void::unreachable(v),
}));
}

fn on_dial_upgrade_error(
Expand All @@ -263,29 +227,27 @@ impl Handler {
self.keep_alive = KeepAlive::No;

match error {
ConnectionHandlerUpgrErr::Timeout => {
StreamUpgradeError::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timeout,
error: StreamUpgradeError::Timeout,
},
));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
StreamUpgradeError::NegotiationFailed => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
error: StreamUpgradeError::NegotiationFailed,
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(Either::Right)));
self.pending_error = Some(error.map_upgrade_err(Either::Right));
}
}
}
Expand All @@ -294,7 +256,7 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
type Error = StreamUpgradeError<
Either<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = Either<protocol::inbound::Upgrade, DeniedUpgrade>;
Expand Down Expand Up @@ -385,9 +347,9 @@ impl ConnectionHandler for Handler {
));
}
Err(e) => {
return Poll::Ready(ConnectionHandlerEvent::Close(
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))),
))
return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply(
Either::Left(e),
)))
}
}
}
Expand Down
20 changes: 7 additions & 13 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use futures::future::Either;
use futures::prelude::*;
use futures::StreamExt;
use instant::Instant;
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
Expand Down Expand Up @@ -526,20 +526,17 @@ impl ConnectionHandler for Handler {
handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer,
error: StreamUpgradeError::Timeout,
..
}) => {
log::debug!("Dial upgrade error: Protocol negotiation timeout");
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
error: StreamUpgradeError::Apply(e),
..
}) => void::unreachable(e),
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error:
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
error: StreamUpgradeError::NegotiationFailed,
..
}) => {
// The protocol is not supported
Expand All @@ -551,10 +548,7 @@ impl ConnectionHandler for Handler {
});
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error:
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)),
error: StreamUpgradeError::Io(e),
..
}) => {
log::debug!("Protocol negotiation failed: {e}")
Expand Down
6 changes: 3 additions & 3 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, ExternalAddresses,
ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol,
AddressScore, ConnectionDenied, DialError, ExternalAddresses, ListenAddresses,
NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, StreamUpgradeError,
THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
Expand Down Expand Up @@ -495,7 +495,7 @@ pub enum Event {
/// The peer with whom the error originated.
peer_id: PeerId,
/// The error that occurred.
error: ConnectionHandlerUpgrErr<UpgradeError>,
error: StreamUpgradeError<UpgradeError>,
},
}

Expand Down
18 changes: 5 additions & 13 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, StreamProtocol, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol,
StreamUpgradeError, SubstreamProtocol,
};
use log::warn;
use smallvec::SmallVec;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub enum Event {
/// We received a request for identification.
Identify,
/// Failed to identify the remote, or to reply to an identification request.
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
IdentificationError(StreamUpgradeError<UpgradeError>),
}

impl Handler {
Expand Down Expand Up @@ -205,13 +205,7 @@ impl Handler {
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
use libp2p_core::upgrade::UpgradeError;

let err = err.map_upgrade_err(|e| match e {
UpgradeError::Select(e) => UpgradeError::Select(e),
UpgradeError::Apply(Either::Left(ioe)) => UpgradeError::Apply(ioe),
UpgradeError::Apply(Either::Right(ioe)) => UpgradeError::Apply(ioe),
});
let err = err.map_upgrade_err(|e| e.into_inner());
self.events
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError(
err,
Expand Down Expand Up @@ -317,9 +311,7 @@ impl ConnectionHandler for Handler {
Event::Identification(peer_id),
)),
Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom(
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
)),
Event::IdentificationError(StreamUpgradeError::Apply(err)),
)),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
Expand Down
10 changes: 5 additions & 5 deletions protocols/kad/src/handler_priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError,
SubstreamProtocol,
};
use log::trace;
use std::collections::VecDeque;
Expand Down Expand Up @@ -325,7 +325,7 @@ pub enum KademliaHandlerEvent<TUserData> {
#[derive(Debug)]
pub enum KademliaHandlerQueryErr {
/// Error while trying to perform the query.
Upgrade(ConnectionHandlerUpgrErr<io::Error>),
Upgrade(StreamUpgradeError<io::Error>),
/// Received an answer that doesn't correspond to the request.
UnexpectedMessage,
/// I/O error in the substream.
Expand Down Expand Up @@ -361,8 +361,8 @@ impl error::Error for KademliaHandlerQueryErr {
}
}

impl From<ConnectionHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
fn from(err: ConnectionHandlerUpgrErr<io::Error>) -> Self {
impl From<StreamUpgradeError<io::Error>> for KademliaHandlerQueryErr {
fn from(err: StreamUpgradeError<io::Error>) -> Self {
KademliaHandlerQueryErr::Upgrade(err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions protocols/perf/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use std::{
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr,
ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent,
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm,
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use void::Void;
Expand All @@ -41,7 +41,7 @@ use super::{RunId, RunParams, RunStats};
#[derive(Debug)]
pub struct Event {
pub id: RunId,
pub result: Result<RunStats, ConnectionHandlerUpgrErr<Void>>,
pub result: Result<RunStats, StreamUpgradeError<Void>>,
}

#[derive(Default)]
Expand Down
Loading