Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(identify): bring tests up to workspace standard #3851

Merged
merged 20 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ either = "1.8.0"
[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10"
libp2p-mplex = { workspace = true }
libp2p-yamux = { workspace = true }
libp2p-noise = { workspace = true }
libp2p-swarm = { workspace = true, features = ["async-std"] }
libp2p-tcp = { workspace = true, features = ["async-io"] }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-swarm = { workspace = true, features = ["macros"] }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
273 changes: 0 additions & 273 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,279 +561,6 @@ impl PeerCache {
#[cfg(test)]
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport};
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent};
use libp2p_tcp as tcp;
use std::time::Duration;

fn transport() -> (PublicKey, transport::Boxed<(PeerId, StreamMuxerBox)>) {
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&id_keys).unwrap())
.multiplex(MplexConfig::new())
.boxed();
(pubkey, transport)
}

#[test]
fn periodic_identify() {
let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});
swarm2.dial(listen_addr).unwrap();

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.select_next_some();
pin_mut!(swarm2_fut);

match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::Behaviour(Event::Received {
info, ..
})) => {
assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert_eq!(info.listen_addrs.len(), 1);
return;
}
_ => {}
}
}
})
}

#[test]
fn identify_push() {
let _ = env_logger::try_init();

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone()));
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

let (mut swarm2, pubkey2) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm =
SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
.build();
(swarm, pubkey)
};

Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await {
return address;
}
}
});

swarm2.dial(listen_addr).unwrap();

async_std::task::block_on(async move {
loop {
let swarm1_fut = swarm1.select_next_some();
let swarm2_fut = swarm2.select_next_some();

{
pin_mut!(swarm1_fut);
pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut)
.await
.factor_second()
.0
{
future::Either::Left(SwarmEvent::Behaviour(Event::Received {
info,
..
})) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => {
// Once a connection is established, we can initiate an
// active push below.
}
_ => continue,
}
}

swarm2
.behaviour_mut()
.push(std::iter::once(pubkey1.to_peer_id()));
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
#[allow(deprecated)]
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone())
// `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus
// closes the connection. At this point in time `swarm2` might not yet have
// identified `swarm1`. To give `swarm2` enough time, set an initial delay on
// `swarm1`.
.with_initial_delay(Duration::from_secs(10)),
);

SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);

SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build()
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial(listen_addr).unwrap();

// Wait until we identified.
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(Event::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// Wait for connection to close.
async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionClosed { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

// We should still be able to dial now!
swarm2.dial(swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}

#[test]
fn check_multiaddr_matches_peer_id() {
Expand Down
6 changes: 3 additions & 3 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

/// Protocol handler for sending and receiving identification requests.
Expand Down Expand Up @@ -89,7 +89,7 @@ pub struct Handler {
#[derive(Debug)]
pub struct InEvent {
/// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>,
pub listen_addrs: HashSet<Multiaddr>,

/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub supported_protocols: Vec<StreamProtocol>,
Expand Down Expand Up @@ -246,7 +246,7 @@ impl ConnectionHandler for Handler {
public_key: self.public_key.clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs,
listen_addrs: Vec::from_iter(listen_addrs),
protocols: supported_protocols,
observed_addr: self.observed_addr.clone(),
};
Expand Down
2 changes: 0 additions & 2 deletions protocols/identify/src/mod.rs

This file was deleted.

Loading