diff --git a/Cargo.lock b/Cargo.lock index 31d333f29de..437a5b6e936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2541,6 +2541,7 @@ dependencies = [ name = "libp2p-perf" version = "0.1.0" dependencies = [ + "anyhow", "clap 4.1.6", "either", "env_logger 0.10.0", @@ -2555,6 +2556,7 @@ dependencies = [ "libp2p-yamux", "log", "rand 0.8.5", + "thiserror", "void", ] diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 31695151b80..cdc0c8211fc 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +anyhow = "1" clap = { version = "4.1.6", features = ["derive"] } either = "1.8.0" env_logger = "0.10.0" @@ -23,6 +24,7 @@ libp2p-swarm = { version = "0.42.0", path = "../../swarm", features = ["macros", libp2p-tcp = { version = "0.39.0", path = "../../transports/tcp", features = ["async-io"] } libp2p-yamux = { version = "0.43.0", path = "../../muxers/yamux" } log = "0.4" +thiserror = "1.0" void = "1" [dev-dependencies] diff --git a/protocols/perf/src/bin/perf-client.rs b/protocols/perf/src/bin/perf-client.rs index 752f17e143e..f9c16a0ba15 100644 --- a/protocols/perf/src/bin/perf-client.rs +++ b/protocols/perf/src/bin/perf-client.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use anyhow::{bail, Result}; use clap::Parser; use futures::{executor::block_on, future::Either, StreamExt}; use libp2p_core::{ @@ -35,7 +36,7 @@ struct Opts { server_address: Multiaddr, } -fn main() { +fn main() -> Result<()> { env_logger::init(); let opts = Opts::parse(); @@ -77,13 +78,26 @@ fn main() { .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) .build(); + swarm.dial(opts.server_address).unwrap(); + let server_peer_id = block_on(async { + loop { + match swarm.next().await.unwrap() { + SwarmEvent::ConnectionEstablished { peer_id, .. } => return Ok(peer_id), + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + bail!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + e => panic!("{e:?}"), + } + } + })?; + swarm.behaviour_mut().perf( - opts.server_address.clone(), + server_peer_id, RunParams { to_send: 10 * 1024 * 1024, to_receive: 10 * 1024 * 1024, }, - ); + )?; let stats = block_on(async { loop { @@ -121,4 +135,6 @@ fn main() { receive_time, receive_bandwidth_mebibit_second ); + + Ok(()) } diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 96fff00bad3..754d9229986 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -21,14 +21,14 @@ //! [`NetworkBehaviour`] of the libp2p perf protocol. use std::{ - collections::{HashMap, VecDeque}, + collections::{HashSet, VecDeque}, task::{Context, Poll}, }; use either::Either; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{ - derive_prelude::ConnectionEstablished, dial_opts::DialOpts, dummy, ConnectionId, FromSwarm, + derive_prelude::ConnectionEstablished, dummy, ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent, THandlerOutEvent, }; @@ -42,9 +42,10 @@ pub enum Event { #[derive(Default)] pub struct Behaviour { - pending_run: HashMap, /// Queue of actions to return when polled. queued_events: VecDeque>>, + /// Set of connected peers. + connected: HashSet, } impl Behaviour { @@ -52,18 +53,28 @@ impl Behaviour { Self::default() } - pub fn perf(&mut self, server: Multiaddr, params: RunParams) { - let opts: DialOpts = server.into(); - let connection_id = opts.connection_id(); - - self.pending_run.insert(connection_id, params); + pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result<(), PerfError> { + if !self.connected.contains(&server) { + return Err(PerfError::NotConnected); + } - // TODO: What if we are already connected? self.queued_events - .push_back(NetworkBehaviourAction::Dial { opts }); + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: server, + handler: NotifyHandler::Any, + event: Either::Left(crate::client::handler::Command::Start { params }), + }); + + return Ok(()); } } +#[derive(thiserror::Error, Debug)] +pub enum PerfError { + #[error("Not connected to peer")] + NotConnected, +} + impl NetworkBehaviour for Behaviour { type ConnectionHandler = Either; type OutEvent = Event; @@ -92,20 +103,24 @@ impl NetworkBehaviour for Behaviour { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, - connection_id, + connection_id: _, endpoint: _, failed_addresses: _, other_established: _, - }) => self - .queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection_id), - event: Either::Left(crate::client::handler::Command::Start { - params: self.pending_run.remove(&connection_id).unwrap(), - }), - }), - FromSwarm::ConnectionClosed(_) => todo!(), + }) => { + self.connected.insert(peer_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id: _, + endpoint: _, + handler: _, + remaining_established, + }) => { + if remaining_established == 0 { + assert!(self.connected.remove(&peer_id)); + } + } FromSwarm::AddressChange(_) => todo!(), FromSwarm::DialFailure(_) => todo!(), FromSwarm::ListenFailure(_) => todo!(),