From 2149b032a95ceeecafab691f3eeaf1b1e065d8dd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:15:15 +0100 Subject: [PATCH 01/16] Move tests to `tests/` directory --- protocols/identify/src/behaviour.rs | 275 --------------------------- protocols/identify/tests/smoke.rs | 277 ++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+), 275 deletions(-) create mode 100644 protocols/identify/tests/smoke.rs diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index ec1c7596de0..2a4abb6ea1d 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -548,281 +548,6 @@ impl PeerCache { #[cfg(test)] mod tests { use super::*; - use futures::pin_mut; - use futures::prelude::*; - use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport}; - use libp2p_identity as identity; - use libp2p_identity::PeerId; - use libp2p_mplex::MplexConfig; - use libp2p_noise as noise; - use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; - use libp2p_tcp as tcp; - use std::time::Duration; - - fn transport() -> ( - identity::PublicKey, - transport::Boxed<(PeerId, StreamMuxerBox)>, - ) { - let id_keys = identity::Keypair::generate_ed25519(); - let pubkey = id_keys.public(); - let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&id_keys).unwrap()) - .multiplex(MplexConfig::new()) - .boxed(); - (pubkey, transport) - } - - #[test] - fn periodic_identify() { - let (mut swarm1, pubkey1) = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new( - Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) - .build(); - (swarm, pubkey) - }; - - let (mut swarm2, pubkey2) = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new( - Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) - .build(); - (swarm, pubkey) - }; - - swarm1 - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { - return address; - } - } - }); - swarm2.dial(listen_addr).unwrap(); - - // nb. Either swarm may receive the `Identified` event first, upon which - // it will permit the connection to be closed, as defined by - // `Handler::connection_keep_alive`. Hence the test succeeds if - // either `Identified` event arrives correctly. - async_std::task::block_on(async move { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - let swarm2_fut = swarm2.select_next_some(); - pin_mut!(swarm2_fut); - - match future::select(swarm1_fut, swarm2_fut) - .await - .factor_second() - .0 - { - future::Either::Left(SwarmEvent::Behaviour(Event::Received { - info, .. - })) => { - assert_eq!(info.public_key, pubkey2); - assert_eq!(info.protocol_version, "c"); - assert_eq!(info.agent_version, "d"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - return; - } - future::Either::Right(SwarmEvent::Behaviour(Event::Received { - info, .. - })) => { - assert_eq!(info.public_key, pubkey1); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert_eq!(info.listen_addrs.len(), 1); - return; - } - _ => {} - } - } - }) - } - - #[test] - fn identify_push() { - let _ = env_logger::try_init(); - - let (mut swarm1, pubkey1) = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone())); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) - .build(); - (swarm, pubkey) - }; - - let (mut swarm2, pubkey2) = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new( - Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) - .build(); - (swarm, pubkey) - }; - - Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { - return address; - } - } - }); - - swarm2.dial(listen_addr).unwrap(); - - async_std::task::block_on(async move { - loop { - let swarm1_fut = swarm1.select_next_some(); - let swarm2_fut = swarm2.select_next_some(); - - { - pin_mut!(swarm1_fut); - pin_mut!(swarm2_fut); - match future::select(swarm1_fut, swarm2_fut) - .await - .factor_second() - .0 - { - future::Either::Left(SwarmEvent::Behaviour(Event::Received { - info, - .. - })) => { - assert_eq!(info.public_key, pubkey2); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - return; - } - future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => { - // Once a connection is established, we can initiate an - // active push below. - } - _ => continue, - } - } - - swarm2 - .behaviour_mut() - .push(std::iter::once(pubkey1.to_peer_id())); - } - }) - } - - #[test] - fn discover_peer_after_disconnect() { - let _ = env_logger::try_init(); - - let mut swarm1 = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new( - Config::new("a".to_string(), pubkey.clone()) - // `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus - // closes the connection. At this point in time `swarm2` might not yet have - // identified `swarm1`. To give `swarm2` enough time, set an initial delay on - // `swarm1`. - .with_initial_delay(Duration::from_secs(10)), - ); - - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() - }; - - let mut swarm2 = { - let (pubkey, transport) = transport(); - let protocol = Behaviour::new( - Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), - ); - - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() - }; - - let swarm1_peer_id = *swarm1.local_peer_id(); - - let listener = swarm1 - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - match swarm1.select_next_some().await { - SwarmEvent::NewListenAddr { - address, - listener_id, - } if listener_id == listener => return address, - _ => {} - } - } - }); - - async_std::task::spawn(async move { - loop { - swarm1.next().await; - } - }); - - swarm2.dial(listen_addr).unwrap(); - - // Wait until we identified. - async_std::task::block_on(async { - loop { - if let SwarmEvent::Behaviour(Event::Received { .. }) = - swarm2.select_next_some().await - { - break; - } - } - }); - - swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); - - // Wait for connection to close. - async_std::task::block_on(async { - loop { - if let SwarmEvent::ConnectionClosed { peer_id, .. } = - swarm2.select_next_some().await - { - break peer_id; - } - } - }); - - // We should still be able to dial now! - swarm2.dial(swarm1_peer_id).unwrap(); - - let connected_peer = async_std::task::block_on(async { - loop { - if let SwarmEvent::ConnectionEstablished { peer_id, .. } = - swarm2.select_next_some().await - { - break peer_id; - } - } - }); - - assert_eq!(connected_peer, swarm1_peer_id); - } #[test] fn check_multiaddr_matches_peer_id() { diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs new file mode 100644 index 00000000000..88d871f4e31 --- /dev/null +++ b/protocols/identify/tests/smoke.rs @@ -0,0 +1,277 @@ +use futures::pin_mut; +use futures::prelude::*; +use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport}; +use libp2p_identify as identify; +use libp2p_identity as identity; +use libp2p_identity::PeerId; +use libp2p_mplex::MplexConfig; +use libp2p_noise as noise; +use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_tcp as tcp; +use std::time::Duration; + +fn transport() -> ( + identity::PublicKey, + transport::Boxed<(PeerId, StreamMuxerBox)>, +) { + let id_keys = identity::Keypair::generate_ed25519(); + let pubkey = id_keys.public(); + let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&id_keys).unwrap()) + .multiplex(MplexConfig::new()) + .boxed(); + (pubkey, transport) +} + +#[test] +fn periodic_identify() { + let (mut swarm1, pubkey1) = { + let (pubkey, transport) = transport(); + let protocol = identify::Behaviour::new( + identify::Config::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string()), + ); + let swarm = + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); + (swarm, pubkey) + }; + + let (mut swarm2, pubkey2) = { + let (pubkey, transport) = transport(); + let protocol = identify::Behaviour::new( + identify::Config::new("c".to_string(), pubkey.clone()) + .with_agent_version("d".to_string()), + ); + let swarm = + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); + (swarm, pubkey) + }; + + swarm1 + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + + let listen_addr = async_std::task::block_on(async { + loop { + let swarm1_fut = swarm1.select_next_some(); + pin_mut!(swarm1_fut); + if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { + return address; + } + } + }); + swarm2.dial(listen_addr).unwrap(); + + // nb. Either swarm may receive the `Identified` event first, upon which + // it will permit the connection to be closed, as defined by + // `Handler::connection_keep_alive`. Hence the test succeeds if + // either `Identified` event arrives correctly. + async_std::task::block_on(async move { + loop { + let swarm1_fut = swarm1.select_next_some(); + pin_mut!(swarm1_fut); + let swarm2_fut = swarm2.select_next_some(); + pin_mut!(swarm2_fut); + + match future::select(swarm1_fut, swarm2_fut) + .await + .factor_second() + .0 + { + future::Either::Left(SwarmEvent::Behaviour(identify::Event::Received { + info, + .. + })) => { + assert_eq!(info.public_key, pubkey2); + assert_eq!(info.protocol_version, "c"); + assert_eq!(info.agent_version, "d"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); + return; + } + future::Either::Right(SwarmEvent::Behaviour(identify::Event::Received { + info, + .. + })) => { + assert_eq!(info.public_key, pubkey1); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert_eq!(info.listen_addrs.len(), 1); + return; + } + _ => {} + } + } + }) +} + +#[test] +fn identify_push() { + let _ = env_logger::try_init(); + + let (mut swarm1, pubkey1) = { + let (pubkey, transport) = transport(); + let protocol = + identify::Behaviour::new(identify::Config::new("a".to_string(), pubkey.clone())); + let swarm = + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); + (swarm, pubkey) + }; + + let (mut swarm2, pubkey2) = { + let (pubkey, transport) = transport(); + let protocol = identify::Behaviour::new( + identify::Config::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string()), + ); + let swarm = + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); + (swarm, pubkey) + }; + + Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + + let listen_addr = async_std::task::block_on(async { + loop { + let swarm1_fut = swarm1.select_next_some(); + pin_mut!(swarm1_fut); + if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { + return address; + } + } + }); + + swarm2.dial(listen_addr).unwrap(); + + async_std::task::block_on(async move { + loop { + let swarm1_fut = swarm1.select_next_some(); + let swarm2_fut = swarm2.select_next_some(); + + { + pin_mut!(swarm1_fut); + pin_mut!(swarm2_fut); + match future::select(swarm1_fut, swarm2_fut) + .await + .factor_second() + .0 + { + future::Either::Left(SwarmEvent::Behaviour(identify::Event::Received { + info, + .. + })) => { + assert_eq!(info.public_key, pubkey2); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); + return; + } + future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => { + // Once a connection is established, we can initiate an + // active push below. + } + _ => continue, + } + } + + swarm2 + .behaviour_mut() + .push(std::iter::once(pubkey1.to_peer_id())); + } + }) +} + +#[test] +fn discover_peer_after_disconnect() { + let _ = env_logger::try_init(); + + let mut swarm1 = { + let (pubkey, transport) = transport(); + let protocol = identify::Behaviour::new( + identify::Config::new("a".to_string(), pubkey.clone()) + // `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus + // closes the connection. At this point in time `swarm2` might not yet have + // identified `swarm1`. To give `swarm2` enough time, set an initial delay on + // `swarm1`. + .with_initial_delay(Duration::from_secs(10)), + ); + + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() + }; + + let mut swarm2 = { + let (pubkey, transport) = transport(); + let protocol = identify::Behaviour::new( + identify::Config::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string()), + ); + + SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() + }; + + let swarm1_peer_id = *swarm1.local_peer_id(); + + let listener = swarm1 + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + + let listen_addr = async_std::task::block_on(async { + loop { + match swarm1.select_next_some().await { + SwarmEvent::NewListenAddr { + address, + listener_id, + } if listener_id == listener => return address, + _ => {} + } + } + }); + + async_std::task::spawn(async move { + loop { + swarm1.next().await; + } + }); + + swarm2.dial(listen_addr).unwrap(); + + // Wait until we identified. + async_std::task::block_on(async { + loop { + if let SwarmEvent::Behaviour(identify::Event::Received { .. }) = + swarm2.select_next_some().await + { + break; + } + } + }); + + swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); + + // Wait for connection to close. + async_std::task::block_on(async { + loop { + if let SwarmEvent::ConnectionClosed { peer_id, .. } = swarm2.select_next_some().await { + break peer_id; + } + } + }); + + // We should still be able to dial now! + swarm2.dial(swarm1_peer_id).unwrap(); + + let connected_peer = async_std::task::block_on(async { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = + swarm2.select_next_some().await + { + break peer_id; + } + } + }); + + assert_eq!(connected_peer, swarm1_peer_id); +} From 20a111cdc2e614c9018f2032fb8a8051741459f3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:27:24 +0100 Subject: [PATCH 02/16] Rewrite first identify tests using `libp2p-swarm-test` --- Cargo.lock | 1 + protocols/identify/Cargo.toml | 1 + protocols/identify/tests/smoke.rs | 92 +++++++++---------------------- 3 files changed, 27 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad9a1d5f15d..10a50a582c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2535,6 +2535,7 @@ dependencies = [ "libp2p-mplex", "libp2p-noise", "libp2p-swarm", + "libp2p-swarm-test", "libp2p-tcp", "libp2p-yamux", "log", diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index bfe75da8133..0d4bea7a9fb 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -34,6 +34,7 @@ libp2p-yamux = { path = "../../muxers/yamux" } libp2p-noise = { path = "../../transports/noise" } libp2p-swarm = { path = "../../swarm", features = ["async-std"] } libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-swarm-test = { path = "../../swarm-test" } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 88d871f4e31..69d37a0c337 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -7,7 +7,9 @@ use libp2p_identity::PeerId; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm_test::SwarmExt; use libp2p_tcp as tcp; +use std::iter; use std::time::Duration; fn transport() -> ( @@ -107,81 +109,37 @@ fn periodic_identify() { }) } -#[test] -fn identify_push() { +#[async_std::test] +async fn identify_push() { let _ = env_logger::try_init(); - let (mut swarm1, pubkey1) = { - let (pubkey, transport) = transport(); - let protocol = - identify::Behaviour::new(identify::Config::new("a".to_string(), pubkey.clone())); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); - (swarm, pubkey) - }; - - let (mut swarm2, pubkey2) = { - let (pubkey, transport) = transport(); - let protocol = identify::Behaviour::new( - identify::Config::new("a".to_string(), pubkey.clone()) + let mut swarm1 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + }); + let mut swarm2 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); - (swarm, pubkey) - }; - - Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { - return address; - } - } + ) }); - swarm2.dial(listen_addr).unwrap(); + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; - async_std::task::block_on(async move { - loop { - let swarm1_fut = swarm1.select_next_some(); - let swarm2_fut = swarm2.select_next_some(); + swarm2 + .behaviour_mut() + .push(iter::once(*swarm1.local_peer_id())); - { - pin_mut!(swarm1_fut); - pin_mut!(swarm2_fut); - match future::select(swarm1_fut, swarm2_fut) - .await - .factor_second() - .0 - { - future::Either::Left(SwarmEvent::Behaviour(identify::Event::Received { - info, - .. - })) => { - assert_eq!(info.public_key, pubkey2); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - return; - } - future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => { - // Once a connection is established, we can initiate an - // active push below. - } - _ => continue, - } - } - - swarm2 - .behaviour_mut() - .push(std::iter::once(pubkey1.to_peer_id())); + match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { + ([identify::Event::Received { info, .. }], [identify::Event::Pushed { .. }]) => { + assert_eq!(info.public_key.to_peer_id(), *swarm2.local_peer_id()); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); } - }) + other => panic!("Unexpected events: {other:?}"), + } } #[test] From dc8dfd8591ebdfbedff9e23f0162b3823ae17d53 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:34:22 +0100 Subject: [PATCH 03/16] Rewrite another test --- protocols/identify/tests/smoke.rs | 104 +++++++++--------------------- 1 file changed, 31 insertions(+), 73 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 69d37a0c337..92e553eaed2 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -10,7 +10,6 @@ use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use libp2p_tcp as tcp; use std::iter; -use std::time::Duration; fn transport() -> ( identity::PublicKey, @@ -142,94 +141,53 @@ async fn identify_push() { } } -#[test] -fn discover_peer_after_disconnect() { +#[async_std::test] +async fn discover_peer_after_disconnect() { let _ = env_logger::try_init(); - let mut swarm1 = { - let (pubkey, transport) = transport(); - let protocol = identify::Behaviour::new( - identify::Config::new("a".to_string(), pubkey.clone()) - // `swarm1` will set `KeepAlive::No` once it identified `swarm2` and thus - // closes the connection. At this point in time `swarm2` might not yet have - // identified `swarm1`. To give `swarm2` enough time, set an initial delay on - // `swarm1`. - .with_initial_delay(Duration::from_secs(10)), - ); - - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() - }; - - let mut swarm2 = { - let (pubkey, transport) = transport(); - let protocol = identify::Behaviour::new( - identify::Config::new("a".to_string(), pubkey.clone()) + let mut swarm1 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + }); + let mut swarm2 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), - ); - - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build() - }; - - let swarm1_peer_id = *swarm1.local_peer_id(); - - let listener = swarm1 - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - match swarm1.select_next_some().await { - SwarmEvent::NewListenAddr { - address, - listener_id, - } if listener_id == listener => return address, - _ => {} - } - } + ) }); - async_std::task::spawn(async move { - loop { - swarm1.next().await; - } - }); + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; - swarm2.dial(listen_addr).unwrap(); + let swarm1_peer_id = *swarm1.local_peer_id(); + async_std::task::spawn(swarm1.loop_on_next()); // Wait until we identified. - async_std::task::block_on(async { - loop { - if let SwarmEvent::Behaviour(identify::Event::Received { .. }) = - swarm2.select_next_some().await - { - break; - } - } - }); + swarm2 + .wait(|event| { + matches!( + event, + SwarmEvent::Behaviour(identify::Event::Received { .. }) + ) + .then_some(()) + }) + .await; swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); // Wait for connection to close. - async_std::task::block_on(async { - loop { - if let SwarmEvent::ConnectionClosed { peer_id, .. } = swarm2.select_next_some().await { - break peer_id; - } - } - }); + swarm2 + .wait(|event| matches!(event, SwarmEvent::ConnectionClosed { .. }).then_some(())) + .await; // We should still be able to dial now! swarm2.dial(swarm1_peer_id).unwrap(); - let connected_peer = async_std::task::block_on(async { - loop { - if let SwarmEvent::ConnectionEstablished { peer_id, .. } = - swarm2.select_next_some().await - { - break peer_id; - } - } - }); + let connected_peer = swarm2 + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => Some(peer_id), + _ => None, + }) + .await; assert_eq!(connected_peer, swarm1_peer_id); } From ff4114759080615cb270e06adeca1ff23ee6ba76 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:40:42 +0100 Subject: [PATCH 04/16] Rewrite final test --- protocols/identify/tests/smoke.rs | 134 +++++++++--------------------- 1 file changed, 41 insertions(+), 93 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 92e553eaed2..842a77a9ae4 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,111 +1,59 @@ -use futures::pin_mut; use futures::prelude::*; -use libp2p_core::{muxing::StreamMuxerBox, transport, upgrade, Transport}; use libp2p_identify as identify; -use libp2p_identity as identity; -use libp2p_identity::PeerId; -use libp2p_mplex::MplexConfig; -use libp2p_noise as noise; -use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; -use libp2p_tcp as tcp; use std::iter; -fn transport() -> ( - identity::PublicKey, - transport::Boxed<(PeerId, StreamMuxerBox)>, -) { - let id_keys = identity::Keypair::generate_ed25519(); - let pubkey = id_keys.public(); - let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&id_keys).unwrap()) - .multiplex(MplexConfig::new()) - .boxed(); - (pubkey, transport) -} - -#[test] -fn periodic_identify() { - let (mut swarm1, pubkey1) = { - let (pubkey, transport) = transport(); - let protocol = identify::Behaviour::new( - identify::Config::new("a".to_string(), pubkey.clone()) +#[async_std::test] +async fn periodic_identify() { + let mut swarm1 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); - (swarm, pubkey) - }; - - let (mut swarm2, pubkey2) = { - let (pubkey, transport) = transport(); - let protocol = identify::Behaviour::new( - identify::Config::new("c".to_string(), pubkey.clone()) + ) + }); + let swarm1_peer_id = *swarm1.local_peer_id(); + + let mut swarm2 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("c".to_string(), identity.public()) .with_agent_version("d".to_string()), - ); - let swarm = - SwarmBuilder::with_async_std_executor(transport, protocol, pubkey.to_peer_id()).build(); - (swarm, pubkey) - }; - - swarm1 - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let listen_addr = async_std::task::block_on(async { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - if let SwarmEvent::NewListenAddr { address, .. } = swarm1_fut.await { - return address; - } - } + ) }); - swarm2.dial(listen_addr).unwrap(); + let swarm2_peer_id = *swarm2.local_peer_id(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; // nb. Either swarm may receive the `Identified` event first, upon which // it will permit the connection to be closed, as defined by // `Handler::connection_keep_alive`. Hence the test succeeds if // either `Identified` event arrives correctly. - async_std::task::block_on(async move { - loop { - let swarm1_fut = swarm1.select_next_some(); - pin_mut!(swarm1_fut); - let swarm2_fut = swarm2.select_next_some(); - pin_mut!(swarm2_fut); - - match future::select(swarm1_fut, swarm2_fut) - .await - .factor_second() - .0 - { - future::Either::Left(SwarmEvent::Behaviour(identify::Event::Received { - info, - .. - })) => { - assert_eq!(info.public_key, pubkey2); - assert_eq!(info.protocol_version, "c"); - assert_eq!(info.agent_version, "d"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - return; - } - future::Either::Right(SwarmEvent::Behaviour(identify::Event::Received { - info, - .. - })) => { - assert_eq!(info.public_key, pubkey1); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert_eq!(info.listen_addrs.len(), 1); - return; - } - _ => {} + loop { + match future::select(swarm1.next_behaviour_event(), swarm2.next_behaviour_event()) + .await + .factor_second() + .0 + { + future::Either::Left(identify::Event::Received { info, .. }) => { + assert_eq!(info.public_key.to_peer_id(), swarm2_peer_id); + assert_eq!(info.protocol_version, "c"); + assert_eq!(info.agent_version, "d"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); + return; } + future::Either::Right(identify::Event::Received { info, .. }) => { + assert_eq!(info.public_key.to_peer_id(), swarm1_peer_id); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert_eq!(info.listen_addrs.len(), 1); + return; + } + _ => {} } - }) + } } #[async_std::test] From 336fcf38d0bee866901dd9908aac117aa24b864e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:51:11 +0100 Subject: [PATCH 05/16] Only listen on local interface in `libp2p-swarm-test` --- swarm-test/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 94bad497e8f..65842b0de40 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -318,7 +318,7 @@ where self.add_external_address(memory_multiaddr.clone(), AddressScore::Infinite); let tcp_addr_listener_id = self - .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); let tcp_multiaddr = self From 562a41f61baa9da4057fdcfdf1ef27e53c64d181 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:51:37 +0100 Subject: [PATCH 06/16] Assert listen and observed address --- protocols/identify/tests/smoke.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 842a77a9ae4..57e521b169e 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,4 +1,5 @@ use futures::prelude::*; +use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; @@ -22,7 +23,8 @@ async fn periodic_identify() { }); let swarm2_peer_id = *swarm2.local_peer_id(); - swarm1.listen().await; + let (swarm1_memory_listen, swarm1_tcp_listen_addr) = swarm1.listen().await; + let (swarm2_memory_listen, swarm2_tcp_listen_addr) = swarm2.listen().await; swarm2.connect(&mut swarm1).await; // nb. Either swarm may receive the `Identified` event first, upon which @@ -40,7 +42,14 @@ async fn periodic_identify() { assert_eq!(info.protocol_version, "c"); assert_eq!(info.agent_version, "d"); assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); + assert_eq!( + info.observed_addr, + swarm1_memory_listen.with(Protocol::P2p(swarm1_peer_id.into())) + ); + assert_eq!( + info.listen_addrs, + vec![swarm2_memory_listen, swarm2_tcp_listen_addr] + ); return; } future::Either::Right(identify::Event::Received { info, .. }) => { @@ -48,7 +57,14 @@ async fn periodic_identify() { assert_eq!(info.protocol_version, "a"); assert_eq!(info.agent_version, "b"); assert!(!info.protocols.is_empty()); - assert_eq!(info.listen_addrs.len(), 1); + assert_eq!( + info.observed_addr, + swarm2_memory_listen.with(Protocol::P2p(swarm2_peer_id.into())) + ); + assert_eq!( + info.listen_addrs, + vec![swarm1_memory_listen, swarm1_tcp_listen_addr] + ); return; } _ => {} From edea5afd779af3cf0e688d8c0be995372e20d71a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:51:49 +0100 Subject: [PATCH 07/16] Ensure listen addresses are unique --- protocols/identify/src/handler.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index c0bd9d928eb..52851664d82 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -39,7 +39,7 @@ use libp2p_swarm::{ }; use log::warn; use smallvec::SmallVec; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; /// Protocol handler for sending and receiving identification requests. @@ -89,7 +89,7 @@ pub struct Handler { #[derive(Debug)] pub struct InEvent { /// The addresses that the peer is listening on. - pub listen_addrs: Vec, + pub listen_addrs: HashSet, /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. pub supported_protocols: Vec, @@ -246,7 +246,7 @@ impl ConnectionHandler for Handler { public_key: self.public_key.clone(), protocol_version: self.protocol_version.clone(), agent_version: self.agent_version.clone(), - listen_addrs, + listen_addrs: Vec::from_iter(listen_addrs), protocols: supported_protocols, observed_addr: self.observed_addr.clone(), }; From dd3c7328c5107750eeb432ad772401d4651354c1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:52:41 +0100 Subject: [PATCH 08/16] Remove `correct_transfer` test This is now included in the smoke test --- protocols/identify/src/protocol.rs | 90 ------------------------------ 1 file changed, 90 deletions(-) diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 1a10b591278..c859fa730f4 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -274,97 +274,7 @@ pub enum UpgradeError { #[cfg(test)] mod tests { use super::*; - use futures::channel::oneshot; - use libp2p_core::{ - upgrade::{self, apply_inbound, apply_outbound}, - Transport, - }; use libp2p_identity as identity; - use libp2p_tcp as tcp; - - #[test] - fn correct_transfer() { - // We open a server and a client, send info from the server to the client, and check that - // they were successfully received. - let send_pubkey = identity::Keypair::generate_ed25519().public(); - let recv_pubkey = send_pubkey.clone(); - - let (tx, rx) = oneshot::channel(); - - let bg_task = async_std::task::spawn(async move { - let mut transport = tcp::async_io::Transport::default().boxed(); - - transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap(); - - let addr = transport - .next() - .await - .expect("some event") - .into_new_address() - .expect("listen address"); - tx.send(addr).unwrap(); - - let socket = transport - .next() - .await - .expect("some event") - .into_incoming() - .unwrap() - .0 - .await - .unwrap(); - - let sender = apply_inbound(socket, Identify).await.unwrap(); - - send( - sender, - Info { - public_key: send_pubkey, - protocol_version: "proto_version".to_owned(), - agent_version: "agent_version".to_owned(), - listen_addrs: vec![ - "/ip4/80.81.82.83/tcp/500".parse().unwrap(), - "/ip6/::1/udp/1000".parse().unwrap(), - ], - protocols: vec!["proto1".to_string(), "proto2".to_string()], - observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), - }, - ) - .await - .unwrap(); - }); - - async_std::task::block_on(async move { - let mut transport = tcp::async_io::Transport::default(); - - let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let info = apply_outbound(socket, Identify, upgrade::Version::V1) - .await - .unwrap(); - assert_eq!( - info.observed_addr, - "/ip4/100.101.102.103/tcp/5000".parse().unwrap() - ); - assert_eq!(info.public_key, recv_pubkey); - assert_eq!(info.protocol_version, "proto_version"); - assert_eq!(info.agent_version, "agent_version"); - assert_eq!( - info.listen_addrs, - &[ - "/ip4/80.81.82.83/tcp/500".parse().unwrap(), - "/ip6/::1/udp/1000".parse().unwrap() - ] - ); - assert_eq!( - info.protocols, - &["proto1".to_string(), "proto2".to_string()] - ); - - bg_task.await; - }); - } #[test] fn skip_invalid_multiaddr() { From 18012d7f81f1688b2af9e90d88f1fdb614dea99f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:53:24 +0100 Subject: [PATCH 09/16] Remove unnecessary dependencies --- Cargo.lock | 4 ---- protocols/identify/Cargo.toml | 5 ----- 2 files changed, 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10a50a582c7..45708e15632 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2532,12 +2532,8 @@ dependencies = [ "futures-timer", "libp2p-core", "libp2p-identity", - "libp2p-mplex", - "libp2p-noise", "libp2p-swarm", "libp2p-swarm-test", - "libp2p-tcp", - "libp2p-yamux", "log", "lru", "quick-protobuf", diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 0d4bea7a9fb..d07ce059712 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -29,11 +29,6 @@ either = "1.8.0" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" -libp2p-mplex = { path = "../../muxers/mplex" } -libp2p-yamux = { path = "../../muxers/yamux" } -libp2p-noise = { path = "../../transports/noise" } -libp2p-swarm = { path = "../../swarm", features = ["async-std"] } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } libp2p-swarm-test = { path = "../../swarm-test" } # Passing arguments to the docsrs builder in order to properly document cfg's. From 2c5c9d3208a3dbc5d7865d789720ead4ac4c889a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 15:55:46 +0100 Subject: [PATCH 10/16] Delete dead code These files are not included in the module tree. --- protocols/identify/src/mod.rs | 2 - protocols/identify/src/structs.rs | 67 ------------------------------- 2 files changed, 69 deletions(-) delete mode 100644 protocols/identify/src/mod.rs delete mode 100644 protocols/identify/src/structs.rs diff --git a/protocols/identify/src/mod.rs b/protocols/identify/src/mod.rs deleted file mode 100644 index e52c5a80bc0..00000000000 --- a/protocols/identify/src/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// Automatically generated mod.rs -pub mod structs; diff --git a/protocols/identify/src/structs.rs b/protocols/identify/src/structs.rs deleted file mode 100644 index 3be9b6f94ad..00000000000 --- a/protocols/identify/src/structs.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Automatically generated rust module for 'structs.proto' file - -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(non_camel_case_types)] -#![allow(unused_imports)] -#![allow(unknown_lints)] -#![allow(clippy::all)] -#![cfg_attr(rustfmt, rustfmt_skip)] - - -use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; -use quick_protobuf::sizeofs::*; -use super::*; - -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Debug, Default, PartialEq, Clone)] -pub struct Identify { - pub protocolVersion: Option, - pub agentVersion: Option, - pub publicKey: Option>, - pub listenAddrs: Vec>, - pub observedAddr: Option>, - pub protocols: Vec, -} - -impl<'a> MessageRead<'a> for Identify { - fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { - let mut msg = Self::default(); - while !r.is_eof() { - match r.next_tag(bytes) { - Ok(42) => msg.protocolVersion = Some(r.read_string(bytes)?.to_owned()), - Ok(50) => msg.agentVersion = Some(r.read_string(bytes)?.to_owned()), - Ok(10) => msg.publicKey = Some(r.read_bytes(bytes)?.to_owned()), - Ok(18) => msg.listenAddrs.push(r.read_bytes(bytes)?.to_owned()), - Ok(34) => msg.observedAddr = Some(r.read_bytes(bytes)?.to_owned()), - Ok(26) => msg.protocols.push(r.read_string(bytes)?.to_owned()), - Ok(t) => { r.read_unknown(bytes, t)?; } - Err(e) => return Err(e), - } - } - Ok(msg) - } -} - -impl MessageWrite for Identify { - fn get_size(&self) -> usize { - 0 - + self.protocolVersion.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) - + self.agentVersion.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) - + self.publicKey.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) - + self.listenAddrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::() - + self.observedAddr.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) - + self.protocols.iter().map(|s| 1 + sizeof_len((s).len())).sum::() - } - - fn write_message(&self, w: &mut Writer) -> Result<()> { - if let Some(ref s) = self.protocolVersion { w.write_with_tag(42, |w| w.write_string(&**s))?; } - if let Some(ref s) = self.agentVersion { w.write_with_tag(50, |w| w.write_string(&**s))?; } - if let Some(ref s) = self.publicKey { w.write_with_tag(10, |w| w.write_bytes(&**s))?; } - for s in &self.listenAddrs { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } - if let Some(ref s) = self.observedAddr { w.write_with_tag(34, |w| w.write_bytes(&**s))?; } - for s in &self.protocols { w.write_with_tag(26, |w| w.write_string(&**s))?; } - Ok(()) - } -} - From 0182be7a6209ffba0ca83012534511e95529817e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 17:43:15 +0100 Subject: [PATCH 11/16] Make tests more resilient --- protocols/identify/tests/smoke.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 57e521b169e..ddff6428891 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -37,7 +37,9 @@ async fn periodic_identify() { .factor_second() .0 { - future::Either::Left(identify::Event::Received { info, .. }) => { + future::Either::Left(identify::Event::Received { mut info, .. }) => { + info.listen_addrs.sort(); + assert_eq!(info.public_key.to_peer_id(), swarm2_peer_id); assert_eq!(info.protocol_version, "c"); assert_eq!(info.agent_version, "d"); @@ -48,11 +50,13 @@ async fn periodic_identify() { ); assert_eq!( info.listen_addrs, - vec![swarm2_memory_listen, swarm2_tcp_listen_addr] + vec![swarm2_tcp_listen_addr, swarm2_memory_listen] ); return; } - future::Either::Right(identify::Event::Received { info, .. }) => { + future::Either::Right(identify::Event::Received { mut info, .. }) => { + info.listen_addrs.sort(); + assert_eq!(info.public_key.to_peer_id(), swarm1_peer_id); assert_eq!(info.protocol_version, "a"); assert_eq!(info.agent_version, "b"); @@ -63,7 +67,7 @@ async fn periodic_identify() { ); assert_eq!( info.listen_addrs, - vec![swarm1_memory_listen, swarm1_tcp_listen_addr] + vec![swarm1_tcp_listen_addr, swarm1_memory_listen] ); return; } From d5bcef6e209a29485127f3a02aed59880c9ec087 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 17:50:16 +0100 Subject: [PATCH 12/16] Revert "Only listen on local interface in `libp2p-swarm-test`" This reverts commit 336fcf38d0bee866901dd9908aac117aa24b864e. --- swarm-test/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 65842b0de40..94bad497e8f 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -318,7 +318,7 @@ where self.add_external_address(memory_multiaddr.clone(), AddressScore::Infinite); let tcp_addr_listener_id = self - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) .unwrap(); let tcp_multiaddr = self From f3f5b6988d9952135aace456a4ad9ea38690015f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 17:52:17 +0100 Subject: [PATCH 13/16] Assert using contains to avoid needing to know other interfaces --- protocols/identify/tests/smoke.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index ddff6428891..eefd8e4ecc2 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -37,9 +37,7 @@ async fn periodic_identify() { .factor_second() .0 { - future::Either::Left(identify::Event::Received { mut info, .. }) => { - info.listen_addrs.sort(); - + future::Either::Left(identify::Event::Received { info, .. }) => { assert_eq!(info.public_key.to_peer_id(), swarm2_peer_id); assert_eq!(info.protocol_version, "c"); assert_eq!(info.agent_version, "d"); @@ -48,15 +46,11 @@ async fn periodic_identify() { info.observed_addr, swarm1_memory_listen.with(Protocol::P2p(swarm1_peer_id.into())) ); - assert_eq!( - info.listen_addrs, - vec![swarm2_tcp_listen_addr, swarm2_memory_listen] - ); + assert!(info.listen_addrs.contains(&swarm2_tcp_listen_addr)); + assert!(info.listen_addrs.contains(&swarm2_memory_listen)); return; } - future::Either::Right(identify::Event::Received { mut info, .. }) => { - info.listen_addrs.sort(); - + future::Either::Right(identify::Event::Received { info, .. }) => { assert_eq!(info.public_key.to_peer_id(), swarm1_peer_id); assert_eq!(info.protocol_version, "a"); assert_eq!(info.agent_version, "b"); @@ -65,10 +59,8 @@ async fn periodic_identify() { info.observed_addr, swarm2_memory_listen.with(Protocol::P2p(swarm2_peer_id.into())) ); - assert_eq!( - info.listen_addrs, - vec![swarm1_tcp_listen_addr, swarm1_memory_listen] - ); + assert!(info.listen_addrs.contains(&swarm1_tcp_listen_addr)); + assert!(info.listen_addrs.contains(&swarm1_memory_listen)); return; } _ => {} From ee682015d1c77817687a728cb0950887e04c7f9f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 May 2023 10:54:11 +0200 Subject: [PATCH 14/16] Fix flaky test --- protocols/identify/Cargo.toml | 1 + protocols/identify/tests/smoke.rs | 56 +++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 79ddf25ac44..414e32dc443 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -30,6 +30,7 @@ either = "1.8.0" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" libp2p-swarm-test = { path = "../../swarm-test" } +libp2p-swarm = { workspace = true, features = ["macros"] } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index eefd8e4ecc2..b6654f2bf9d 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,7 +1,7 @@ use futures::prelude::*; use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; -use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use std::iter; @@ -73,10 +73,10 @@ async fn identify_push() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - identify::Behaviour::new( + Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -85,19 +85,53 @@ async fn identify_push() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; + // First, let the periodic identify do its thing. + match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { + ( + [BehaviourEvent::Identify(identify::Event::Sent { .. }), BehaviourEvent::Identify(identify::Event::Received { .. })], + [BehaviourEvent::Identify(identify::Event::Sent { .. }), BehaviourEvent::Identify(identify::Event::Received { .. })], + ) => {} + other => panic!("Unexpected events: {other:?}"), + }; + + // Second, actively push. swarm2 .behaviour_mut() + .identify .push(iter::once(*swarm1.local_peer_id())); - match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ([identify::Event::Received { info, .. }], [identify::Event::Pushed { .. }]) => { - assert_eq!(info.public_key.to_peer_id(), *swarm2.local_peer_id()); - assert_eq!(info.protocol_version, "a"); - assert_eq!(info.agent_version, "b"); - assert!(!info.protocols.is_empty()); - assert!(info.listen_addrs.is_empty()); - } + let swarm1_received_info = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { + ( + [BehaviourEvent::Identify(identify::Event::Received { info, .. })], + [BehaviourEvent::Identify(identify::Event::Pushed { .. })], + ) => info, other => panic!("Unexpected events: {other:?}"), + }; + + assert_eq!( + swarm1_received_info.public_key.to_peer_id(), + *swarm2.local_peer_id() + ); + assert_eq!(swarm1_received_info.protocol_version, "a"); + assert_eq!(swarm1_received_info.agent_version, "b"); + assert!(!swarm1_received_info.protocols.is_empty()); + assert!(swarm1_received_info.listen_addrs.is_empty()); + + /// Combined behaviour to keep the connection alive after the periodic identify. + #[derive(libp2p_swarm::NetworkBehaviour)] + #[behaviour(prelude = "libp2p_swarm::derive_prelude")] + struct Behaviour { + identify: identify::Behaviour, + keep_alive: keep_alive::Behaviour, + } + + impl Behaviour { + fn new(config: identify::Config) -> Self { + Self { + identify: identify::Behaviour::new(config), + keep_alive: keep_alive::Behaviour::default(), + } + } } } From 499667f6667072f5932b805e4f21bd78762ed4fe Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 May 2023 11:08:39 +0200 Subject: [PATCH 15/16] Fix more flakiness --- protocols/identify/tests/smoke.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index b6654f2bf9d..51122a25338 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -88,9 +88,17 @@ async fn identify_push() { // First, let the periodic identify do its thing. match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { ( - [BehaviourEvent::Identify(identify::Event::Sent { .. }), BehaviourEvent::Identify(identify::Event::Received { .. })], - [BehaviourEvent::Identify(identify::Event::Sent { .. }), BehaviourEvent::Identify(identify::Event::Received { .. })], - ) => {} + [BehaviourEvent::Identify(e1), BehaviourEvent::Identify(e2)], + [BehaviourEvent::Identify(e3), BehaviourEvent::Identify(e4)], + ) => { + use identify::Event::{Received, Sent}; + + // These can be received in any order, hence assert them here instead of the pattern above. + assert!(matches!(e1, Received { .. } | Sent { .. })); + assert!(matches!(e2, Received { .. } | Sent { .. })); + assert!(matches!(e3, Received { .. } | Sent { .. })); + assert!(matches!(e4, Received { .. } | Sent { .. })); + } other => panic!("Unexpected events: {other:?}"), }; From f878e94eab1cb6f24a2f903af948dcb99d9ab7d0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 4 May 2023 10:04:06 +0200 Subject: [PATCH 16/16] Fix more flakiness --- protocols/identify/tests/smoke.rs | 43 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 51122a25338..9abe6a73b8a 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -124,23 +124,6 @@ async fn identify_push() { assert_eq!(swarm1_received_info.agent_version, "b"); assert!(!swarm1_received_info.protocols.is_empty()); assert!(swarm1_received_info.listen_addrs.is_empty()); - - /// Combined behaviour to keep the connection alive after the periodic identify. - #[derive(libp2p_swarm::NetworkBehaviour)] - #[behaviour(prelude = "libp2p_swarm::derive_prelude")] - struct Behaviour { - identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, - } - - impl Behaviour { - fn new(config: identify::Config) -> Self { - Self { - identify: identify::Behaviour::new(config), - keep_alive: keep_alive::Behaviour::default(), - } - } - } } #[async_std::test] @@ -148,10 +131,10 @@ async fn discover_peer_after_disconnect() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - identify::Behaviour::new( + Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -168,7 +151,7 @@ async fn discover_peer_after_disconnect() { .wait(|event| { matches!( event, - SwarmEvent::Behaviour(identify::Event::Received { .. }) + SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) ) .then_some(()) }) @@ -193,3 +176,23 @@ async fn discover_peer_after_disconnect() { assert_eq!(connected_peer, swarm1_peer_id); } + +/// Combined behaviour to keep the connection alive after the periodic identify. +/// +/// The identify implementation sets `keep_alive` to `No` once it has done its thing. +/// This can result in unexpected connection closures if one peer is faster than the other. +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct Behaviour { + identify: identify::Behaviour, + keep_alive: keep_alive::Behaviour, +} + +impl Behaviour { + fn new(config: identify::Config) -> Self { + Self { + identify: identify::Behaviour::new(config), + keep_alive: keep_alive::Behaviour::default(), + } + } +}