Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(swarm)!: refactor ListenUpgradeError and DialUpgradeError #3307

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ mod network {
use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel};
use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent};
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_core::UpgradeError;
use std::collections::{hash_map, HashMap, HashSet};
use std::iter;

Expand Down Expand Up @@ -403,10 +404,7 @@ mod network {

async fn handle_event(
&mut self,
event: SwarmEvent<
ComposedEvent,
EitherError<ConnectionHandlerUpgrErr<io::Error>, io::Error>,
>,
event: SwarmEvent<ComposedEvent, EitherError<UpgradeError<io::Error>, io::Error>>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

- Bump MSRV to 1.65.0.

- Update to `libp2p-dcutr` `v0.9.0`.
- Update to `libp2p-dcutr` `v0.9.0` and introduce `dcutr::EventType::DirectConnectionUpgradeTimedOut`.

- Update to `libp2p-ping` `v0.42.0`.

- Update to `libp2p-kad` `v0.43.0`.

- Update to `libp2p-relay` `v0.15.0`.
- Update to `libp2p-relay` `v0.15.0` and introduce `relay::EventType::CircuitReqOutboundConectTimedOut`.

- Update to `libp2p-identify` `v0.42.0`.

Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/src/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum EventType {
RemoteInitiatedDirectConnectionUpgrade,
DirectConnectionUpgradeSucceeded,
DirectConnectionUpgradeFailed,
DirectConnectionUpgradeTimedOut,
}

impl From<&libp2p_dcutr::Event> for EventType {
Expand All @@ -73,6 +74,9 @@ impl From<&libp2p_dcutr::Event> for EventType {
remote_peer_id: _,
error: _,
} => EventType::DirectConnectionUpgradeFailed,
libp2p_dcutr::Event::DirectConnectionUpgradeTimedout { .. } => {
EventType::DirectConnectionUpgradeTimedOut
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
libp2p_identify::Event::Error { .. } => {
self.error.inc();
}
libp2p_identify::Event::Timeout => {
self.error.inc();
}
libp2p_identify::Event::Pushed { .. } => {
self.pushed.inc();
}
Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum EventType {
CircuitReqDenyFailed,
CircuitReqOutboundConnectFailed,
CircuitReqAccepted,
CircuitReqTimedOut,
CircuitReqAcceptFailed,
CircuitClosed,
}
Expand All @@ -78,6 +79,9 @@ impl From<&libp2p_relay::Event> for EventType {
libp2p_relay::Event::CircuitReqReceiveFailed { .. } => {
EventType::CircuitReqReceiveFailed
}
libp2p_relay::Event::CircuitReqOutboundConectTimedOut { .. } => {
EventType::CircuitReqTimedOut
}
libp2p_relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied,
libp2p_relay::Event::CircuitReqOutboundConnectFailed { .. } => {
EventType::CircuitReqOutboundConnectFailed
Expand Down
4 changes: 3 additions & 1 deletion protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

- Update to `libp2p-core` `v0.39.0`.

- Update to `libp2p-swarm` `v0.42.0`.
- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout` introduction and consequential changes.
With that introduce `Event::DirectConnectionUpgradeTimedout`. See [PR 3307].

- Declare `InboundUpgradeError` and `OutboundUpgradeError` as type aliases instead of renames.
This is a workaround for a missing feature in `cargo semver-checks`. See [PR 3213].
Expand All @@ -15,6 +16,7 @@
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[issue 2217]: https://github.com/libp2p/rust-libp2p/issues/2217
[PR 3214]: https://github.com/libp2p/rust-libp2p/pull/3214
[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307

# 0.8.0

Expand Down
18 changes: 15 additions & 3 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::protocol;
use either::Either;
use libp2p_core::connection::{ConnectedPoint, ConnectionId};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::UpgradeError;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerUpgrErr, ExternalAddresses, IntoConnectionHandler,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ConnectionHandler, ExternalAddresses, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
Expand All @@ -56,14 +57,17 @@ pub enum Event {
remote_peer_id: PeerId,
error: Error,
},
DirectConnectionUpgradeTimedout {
remote_peer_id: PeerId,
},
}

#[derive(Debug, Error)]
pub enum Error {
#[error("Failed to dial peer.")]
Dial,
#[error("Failed to establish substream: {0}.")]
Handler(ConnectionHandlerUpgrErr<void::Void>),
Handler(UpgradeError<void::Void>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we emitted an error (and thus closed the connection) when we ran into a timeout. Why are we changing this behaviour now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the behaviour the same? In the sense that the connection was closed because of a timeout and we are bubbling it up? But after reading and replying on #3307 (comment) I now see that Timeout makes sense being a variant on the Error itself of DirectConnectionUpgradeFailed rather than a variant of Event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the behaviour the same? In the sense that the connection was closed because of a timeout and we are bubbling it up?

Unless I am mistaken, the timeout refers to a stream upgrade. The connection can/should still be intact at this stage. Something like PendingUpgrade should trigger this timeout on the remote but the connection underneath works perfectly fine.

It does raise the question whether we should close an entire connection at all just because one protocol failed to upgrade a stream. Just disabling that protocol (and handler) and letting the collaborative keep-alive eventually close the connection seems like a better idea at first sight.

We may want to do this in a separate PR though, either before or after we merge this.

}

pub struct Behaviour {
Expand Down Expand Up @@ -241,6 +245,14 @@ impl NetworkBehaviour for Behaviour {
.into(),
);
}
Either::Left(handler::relayed::Event::InboundNegotiationTimedout) => {
self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeTimedout {
remote_peer_id: event_source,
})
.into(),
);
}
Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => {
self.queued_actions.push_back(
NetworkBehaviourAction::Dial {
Expand Down
9 changes: 4 additions & 5 deletions protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@

use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_core::UpgradeError;
use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use libp2p_swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol};
use std::task::{Context, Poll};
use void::Void;

Expand All @@ -52,7 +50,7 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = void::Void;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<std::io::Error>;
type Error = UpgradeError<std::io::Error>;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
Expand Down Expand Up @@ -103,6 +101,7 @@ impl ConnectionHandler for Handler {
ConnectionEvent::FullyNegotiatedInbound(_)
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::DialTimeout(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
}
Expand Down
71 changes: 25 additions & 46 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use libp2p_swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol};
use std::collections::VecDeque;
use std::fmt;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -83,11 +80,12 @@ pub enum Event {
remote_addr: Multiaddr,
},
InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
error: UpgradeError<void::Void>,
},
InboundNegotiationTimedout,
InboundConnectNegotiated(Vec<Multiaddr>),
OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr<void::Void>,
error: UpgradeError<void::Void>,
},
OutboundConnectNegotiated {
remote_addrs: Vec<Multiaddr>,
Expand All @@ -109,6 +107,9 @@ impl fmt::Debug for Event {
.debug_struct("Event::InboundNegotiationFailed")
.field("error", error)
.finish(),
Event::InboundNegotiationTimedout => {
f.debug_struct("Event::InboundNegotiationTimedout").finish()
}
Event::InboundConnectNegotiated(addrs) => f
.debug_tuple("Event::InboundConnectNegotiated")
.field(addrs)
Expand All @@ -133,7 +134,7 @@ pub struct Handler {
endpoint: ConnectedPoint,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<
ConnectionHandlerUpgrErr<
UpgradeError<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>,
>,
Expand Down Expand Up @@ -220,41 +221,23 @@ impl Handler {
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
ConnectionHandlerUpgrErr::Timer => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timer,
},
));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
UpgradeError::Select(NegotiationError::Failed) => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.keep_alive = KeepAlive::No;
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
error: UpgradeError::Select(NegotiationError::Failed),
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| {
e.map_err(|e| match e {
EitherError::A(e) => EitherError::A(e),
EitherError::B(v) => void::unreachable(v),
})
self.pending_error = Some(error.map_err(|e| match e {
EitherError::A(e) => EitherError::A(e),
EitherError::B(v) => void::unreachable(v),
}));
}
}
Expand All @@ -270,29 +253,20 @@ impl Handler {
self.keep_alive = KeepAlive::No;

match error {
ConnectionHandlerUpgrErr::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
UpgradeError::Select(NegotiationError::Failed) => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
error: UpgradeError::Select(NegotiationError::Failed),
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B)));
self.pending_error = Some(error.map_err(EitherError::B));
}
}
}
Expand All @@ -301,7 +275,7 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
type Error = UpgradeError<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
Expand Down Expand Up @@ -392,9 +366,9 @@ impl ConnectionHandler for Handler {
));
}
Err(e) => {
return Poll::Ready(ConnectionHandlerEvent::Close(
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))),
))
return Poll::Ready(ConnectionHandlerEvent::Close(UpgradeError::Apply(
EitherError::A(e),
)))
}
}
}
Expand Down Expand Up @@ -424,6 +398,11 @@ impl ConnectionHandler for Handler {
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::DialTimeout(_) => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundNegotiationTimedout,
));
}
ConnectionEvent::AddressChange(_) => {}
}
}
Expand Down
5 changes: 4 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

- Update to `libp2p-core` `v0.39.0`.

- Update to `libp2p-swarm` `v0.42.0`.
- Update to `libp2p-swarm` `v0.42.0`. Update to the `libp2p_swarm::handler::ConnectionEvent` `DialTimeout`
introduction and consequential changes. See [PR 3307].

[PR 3307]: https://github.com/libp2p/rust-libp2p/pull/3307

# 0.43.0

Expand Down
Loading