Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): don't have ConnectionHandlers close connections #4755

Merged
merged 15 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use async_std::io;
use either::Either;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;

Expand Down Expand Up @@ -208,10 +206,7 @@ impl EventLoop {
}
}

async fn handle_event(
&mut self,
event: SwarmEvent<BehaviourEvent, Either<void::Void, io::Error>>,
) {
async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
match event {
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
kad::Event::OutboundQueryProgressed {
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ impl Recorder<libp2p_relay::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
Expand Down
10 changes: 4 additions & 6 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
match event {
libp2p_swarm::SwarmEvent::Behaviour(_) => {}
libp2p_swarm::SwarmEvent::ConnectionEstablished {
Expand Down Expand Up @@ -366,15 +366,13 @@ struct ConnectionClosedLabels {
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}

impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
}
Expand Down
10 changes: 1 addition & 9 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::collections::VecDeque;
use std::io;
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;

#[derive(Debug)]
pub enum Command {
Expand All @@ -63,7 +62,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -182,7 +180,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Command;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -228,12 +225,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Return queued events.
if let Some(event) = self.queued_events.pop_front() {
Expand Down
10 changes: 1 addition & 9 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use void::Void;

/// The event emitted by the Handler. This informs the behaviour of various events created
/// by the handler.
Expand Down Expand Up @@ -220,7 +219,6 @@ impl EnabledHandler {
<Handler as ConnectionHandler>::OutboundProtocol,
<Handler as ConnectionHandler>::OutboundOpenInfo,
<Handler as ConnectionHandler>::ToBehaviour,
<Handler as ConnectionHandler>::Error,
>,
> {
if !self.peer_kind_sent {
Expand Down Expand Up @@ -389,7 +387,6 @@ impl EnabledHandler {
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type Error = Void;
type InboundOpenInfo = ();
type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -431,12 +428,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
match self {
Handler::Enabled(handler) => handler.poll(cx),
Expand Down
8 changes: 2 additions & 6 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_swarm::{
use log::{warn, Level};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration};
use std::{task::Context, task::Poll, time::Duration};

const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
Expand All @@ -57,7 +57,6 @@ pub struct Handler {
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
io::Error,
>; 4],
>,

Expand Down Expand Up @@ -278,7 +277,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = InEvent;
type ToBehaviour = Event;
type Error = io::Error;
type InboundProtocol =
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
Expand Down Expand Up @@ -315,9 +313,7 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event>> {
if let Some(event) = self.events.pop() {
return Poll::Ready(event);
}
Expand Down
12 changes: 3 additions & 9 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type Error = io::Error; // TODO: better error type?
type InboundProtocol = Either<ProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -706,12 +705,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
match &mut self.protocol_status {
Some(status) if !status.reported => {
Expand Down Expand Up @@ -847,7 +841,7 @@ impl Handler {
}

impl futures::Stream for OutboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand Down Expand Up @@ -979,7 +973,7 @@ impl futures::Stream for OutboundSubstreamState {
}

impl futures::Stream for InboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand Down
10 changes: 1 addition & 9 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use libp2p_swarm::{
},
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
};
use void::Void;

use crate::client::{RunError, RunId};
use crate::{RunParams, RunUpdate};
Expand All @@ -59,7 +58,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -87,7 +85,6 @@ impl Default for Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Command;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -157,12 +154,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
Expand Down
8 changes: 1 addition & 7 deletions protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl Default for Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
Expand Down Expand Up @@ -119,12 +118,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
loop {
match self.inbound.poll_unpin(cx) {
Expand Down
11 changes: 2 additions & 9 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Result<Duration, Failure>;
type Error = Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand All @@ -228,14 +227,8 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
ReadyUpgrade<StreamProtocol>,
(),
Result<Duration, Failure>,
Self::Error,
>,
> {
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
{
match self.state {
State::Inactive { reported: true } => {
return Poll::Pending; // nothing to do on this connection
Expand Down
9 changes: 1 addition & 8 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -482,7 +481,6 @@ type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
impl ConnectionHandler for Handler {
type FromBehaviour = In;
type ToBehaviour = Event;
type Error = void::Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type InboundOpenInfo = ();
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
Expand Down Expand Up @@ -592,12 +590,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Return queued events.
if let Some(event) = self.queued_events.pop_front() {
Expand Down
9 changes: 1 addition & 8 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ pub struct Handler {
<Handler as ConnectionHandler>::OutboundProtocol,
<Handler as ConnectionHandler>::OutboundOpenInfo,
<Handler as ConnectionHandler>::ToBehaviour,
<Handler as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -230,7 +229,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = In;
type ToBehaviour = Event;
type Error = void::Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type InboundOpenInfo = ();
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
Expand Down Expand Up @@ -274,12 +272,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
loop {
debug_assert_eq!(
Expand Down
4 changes: 1 addition & 3 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ where
{
type FromBehaviour = OutboundMessage<TCodec>;
type ToBehaviour = Event<TCodec>;
type Error = void::Void;
type InboundProtocol = Protocol<TCodec::Protocol>;
type OutboundProtocol = Protocol<TCodec::Protocol>;
type OutboundOpenInfo = ();
Expand All @@ -389,8 +388,7 @@ where
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour, Self::Error>>
{
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour>> {
match self.worker_streams.poll_unpin(cx) {
Poll::Ready((_, Ok(Ok(event)))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
Expand Down
Loading