Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(identify): keep connection alive while we are using it #3876

Merged
merged 23 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2149b03
Move tests to `tests/` directory
thomaseizinger Apr 28, 2023
20a111c
Rewrite first identify tests using `libp2p-swarm-test`
thomaseizinger Apr 28, 2023
dc8dfd8
Rewrite another test
thomaseizinger Apr 28, 2023
ff41147
Rewrite final test
thomaseizinger Apr 28, 2023
336fcf3
Only listen on local interface in `libp2p-swarm-test`
thomaseizinger Apr 28, 2023
562a41f
Assert listen and observed address
thomaseizinger Apr 28, 2023
edea5af
Ensure listen addresses are unique
thomaseizinger Apr 28, 2023
dd3c732
Remove `correct_transfer` test
thomaseizinger Apr 28, 2023
18012d7
Remove unnecessary dependencies
thomaseizinger Apr 28, 2023
2c5c9d3
Delete dead code
thomaseizinger Apr 28, 2023
0182be7
Make tests more resilient
thomaseizinger Apr 28, 2023
d5bcef6
Revert "Only listen on local interface in `libp2p-swarm-test`"
thomaseizinger Apr 28, 2023
f3f5b69
Assert using contains to avoid needing to know other interfaces
thomaseizinger Apr 28, 2023
2f1fcff
Merge branch 'master' into refactor/better-identify-tests
thomaseizinger May 2, 2023
f332d69
Merge branch 'master' into refactor/better-identify-tests
thomaseizinger May 2, 2023
ee68201
Fix flaky test
thomaseizinger May 3, 2023
499667f
Fix more flakiness
thomaseizinger May 3, 2023
4f29de5
Merge branch 'master' into refactor/better-identify-tests
thomaseizinger May 4, 2023
f878e94
Fix more flakiness
thomaseizinger May 4, 2023
3a36bd4
Keep connection alive while we are working on it
thomaseizinger May 4, 2023
e668f51
Merge branch 'master' into feat/identify-keep-connection-alive
thomaseizinger May 8, 2023
aee7383
Add changelog entry
thomaseizinger May 8, 2023
af8b439
Merge branch 'master' into feat/identify-keep-connection-alive
mergify[bot] May 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ either = "1.8.0"
[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10"
libp2p-mplex = { workspace = true }
libp2p-yamux = { workspace = true }
libp2p-noise = { workspace = true }
libp2p-swarm = { workspace = true, features = ["async-std"] }
libp2p-tcp = { workspace = true, features = ["async-io"] }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-swarm = { workspace = true, features = ["macros"] }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
273 changes: 0 additions & 273 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,279 +561,6 @@ impl PeerCache {
#[cfg(test)]
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport};
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent};
use libp2p_tcp as tcp;
use std::time::Duration;

fn transport() -> (PublicKey, transport::Boxed<(PeerId, StreamMuxerBox)>) {
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&id_keys).unwrap())
.multiplex(MplexConfig::new())
.boxed();
(pubkey, transport)
}

#[test]
fn periodic_identify() {
let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});
swarm2.dial(listen_addr).unwrap();

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.select_next_some();
pin_mut!(swarm2_fut);

match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert_eq!(info.listen_addrs.len(), 1);
return;
}
_ => {}
}
}
})
}

#[test]
fn identify_push() {
let _ = env_logger::try_init();

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone()));
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});

swarm2.dial(listen_addr).unwrap();

async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
let swarm2_fut = swarm2.select_next_some();

{
pin_mut!(swarm1_fut);
pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info,
..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => {
// Once a connection is established, we can initiate an
// active push below.
}
_ => continue,
}
}

swarm2
.behaviour_mut()
.push(std::iter::once(pubkey1.to_peer_id()));
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
#[allow(deprecated)]
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone())
// `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus
// closes the connection. At this point in time `swarm2` might not yet have
// identified `swarm1`. To give `swarm2` enough time, set an initial delay on
// `swarm1`.
.with_initial_delay(Duration::from_secs(10)),
);

SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);

SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial(listen_addr).unwrap();

// Wait until we identified.
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(Event::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// Wait for connection to close.
async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionClosed { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

// We should still be able to dial now!
swarm2.dial(swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}

#[test]
fn check_multiaddr_matches_peer_id() {
Expand Down
26 changes: 16 additions & 10 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

/// Protocol handler for sending and receiving identification requests.
Expand All @@ -64,9 +64,6 @@ pub struct Handler {
/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,

/// Whether the handler should keep the connection alive.
keep_alive: KeepAlive,

/// The interval of `trigger_next_identify`, i.e. the recurrent delay.
interval: Duration,

Expand All @@ -89,7 +86,7 @@ pub struct Handler {
#[derive(Debug)]
pub struct InEvent {
/// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>,
pub listen_addrs: HashSet<Multiaddr>,

/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub supported_protocols: Vec<StreamProtocol>,
Expand Down Expand Up @@ -132,7 +129,6 @@ impl Handler {
reply_streams: VecDeque::new(),
pending_replies: FuturesUnordered::new(),
trigger_next_identify: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
interval,
public_key,
protocol_version,
Expand Down Expand Up @@ -190,7 +186,6 @@ impl Handler {
.push(ConnectionHandlerEvent::Custom(Event::Identified(
remote_info,
)));
self.keep_alive = KeepAlive::No;
}
future::Either::Right(()) => self
.events
Expand All @@ -216,7 +211,6 @@ impl Handler {
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError(
err,
)));
self.keep_alive = KeepAlive::No;
self.trigger_next_identify.reset(self.interval);
}
}
Expand Down Expand Up @@ -246,7 +240,7 @@ impl ConnectionHandler for Handler {
public_key: self.public_key.clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs,
listen_addrs: Vec::from_iter(listen_addrs),
protocols: supported_protocols,
observed_addr: self.observed_addr.clone(),
};
Expand Down Expand Up @@ -274,7 +268,19 @@ impl ConnectionHandler for Handler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
if self.inbound_identify_push.is_some() {
return KeepAlive::Yes;
}

if !self.pending_replies.is_empty() {
return KeepAlive::Yes;
}

if !self.reply_streams.is_empty() {
return KeepAlive::Yes;
}

KeepAlive::No
}

fn poll(
Expand Down
2 changes: 0 additions & 2 deletions protocols/identify/src/mod.rs

This file was deleted.

Loading