From 76abab9e20c29d03f787f39565de38dff42b0f80 Mon Sep 17 00:00:00 2001 From: Anton Date: Mon, 19 Dec 2022 11:26:19 +0400 Subject: [PATCH] feat: log bandwidth on substream instead of socket level (#3180) Previously, the `with_bandwidth_logging` extension to `Transport` would track the bytes sent and received on a socket level. This however only works in conjunction with `Transport` upgrades where a separate multiplexer is negotiated on top of a regular stream. With QUIC and WebRTC landing, this no longer works as those `Transport`s bring their own multiplexing stack. To still allow for tracking of bandwidth, we refactor the `with_bandwidth_logging` extension to count the bytes send on all substreams opened through a `StreamMuxer`. This works, regardless of the underlying transport technology. It does omit certain layers. However, there isn't necessarily a "correct" layer to count bandwidth on because you can always go down another layer (IP, Ethernet, etc). Closes #3157. --- CHANGELOG.md | 5 +- Cargo.toml | 4 ++ src/bandwidth.rs | 149 +++++++++++++++---------------------------- src/transport_ext.rs | 66 +++++++++++++++++-- 4 files changed, 120 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8df2d37c210..511120d1a57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,8 +47,11 @@ # 0.51.0 [unreleased] +- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)). + - `BandwidthLogging::new` now requires a 2nd argument: `Arc` + - Remove `BandwidthFuture` + - Rename `BandwidthConnecLogging` to `InstrumentedStream` - Remove `SimpleProtocol` due to being unused. See [`libp2p::core::upgrade`](https://docs.rs/libp2p/0.50.0/libp2p/core/upgrade/index.html) for alternatives. See [PR 3191]. - - Update individual crates. - Update to [`libp2p-dcutr` `v0.9.0`](protocols/dcutr/CHANGELOG.md#090). diff --git a/Cargo.toml b/Cargo.toml index eaed637a2d8..0ec146a9c07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,10 @@ env_logger = "0.10.0" clap = { version = "4.0.13", features = ["derive"] } tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] } +libp2p-mplex = { version = "0.38.0", path = "muxers/mplex" } +libp2p-noise = { version = "0.41.0", path = "transports/noise" } +libp2p-tcp = { version = "0.38.0", path = "transports/tcp", features = ["tokio"] } + [workspace] members = [ "core", diff --git a/src/bandwidth.rs b/src/bandwidth.rs index a58eec95ddb..dc696ce07e2 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -18,20 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - core::{ - transport::{TransportError, TransportEvent}, - Transport, - }, - Multiaddr, -}; +use crate::core::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::{ io::{IoSlice, IoSliceMut}, prelude::*, ready, }; -use libp2p_core::transport::ListenerId; use std::{ convert::TryFrom as _, io, @@ -43,121 +36,86 @@ use std::{ task::{Context, Poll}, }; -/// Wraps around a `Transport` and counts the number of bytes that go through all the opened -/// connections. +/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened +/// streams. #[derive(Clone)] #[pin_project::pin_project] -pub struct BandwidthLogging { +pub(crate) struct BandwidthLogging { #[pin] - inner: TInner, + inner: SMInner, sinks: Arc, } -impl BandwidthLogging { - /// Creates a new [`BandwidthLogging`] around the transport. - pub fn new(inner: TInner) -> (Self, Arc) { - let sink = Arc::new(BandwidthSinks { - inbound: AtomicU64::new(0), - outbound: AtomicU64::new(0), - }); - - let trans = BandwidthLogging { - inner, - sinks: sink.clone(), - }; - - (trans, sink) +impl BandwidthLogging { + /// Creates a new [`BandwidthLogging`] around the stream muxer. + pub(crate) fn new(inner: SMInner, sinks: Arc) -> Self { + Self { inner, sinks } } } -impl Transport for BandwidthLogging +impl StreamMuxer for BandwidthLogging where - TInner: Transport, + SMInner: StreamMuxer, { - type Output = BandwidthConnecLogging; - type Error = TInner::Error; - type ListenerUpgrade = BandwidthFuture; - type Dial = BandwidthFuture; + type Substream = InstrumentedStream; + type Error = SMInner::Error; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); - match this.inner.poll(cx) { - Poll::Ready(event) => { - let event = event.map_upgrade({ - let sinks = this.sinks.clone(); - |inner| BandwidthFuture { inner, sinks } - }); - Poll::Ready(event) - } - Poll::Pending => Poll::Pending, - } - } - - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.inner.listen_on(addr) + this.inner.poll(cx) } - fn remove_listener(&mut self, id: ListenerId) -> bool { - self.inner.remove_listener(id) - } - - fn dial(&mut self, addr: Multiaddr) -> Result> { - let sinks = self.sinks.clone(); - self.inner - .dial(addr) - .map(move |fut| BandwidthFuture { inner: fut, sinks }) - } - - fn dial_as_listener( - &mut self, - addr: Multiaddr, - ) -> Result> { - let sinks = self.sinks.clone(); - self.inner - .dial_as_listener(addr) - .map(move |fut| BandwidthFuture { inner: fut, sinks }) - } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_inbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) } -} - -/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth -/// counter. -#[pin_project::pin_project] -pub struct BandwidthFuture { - #[pin] - inner: TInner, - sinks: Arc, -} - -impl Future for BandwidthFuture { - type Output = Result, TInner::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this = self.project(); - let inner = ready!(this.inner.try_poll(cx)?); - let logged = BandwidthConnecLogging { + let inner = ready!(this.inner.poll_outbound(cx)?); + let logged = InstrumentedStream { inner, sinks: this.sinks.clone(), }; Poll::Ready(Ok(logged)) } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } } -/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`]. +/// Allows obtaining the average bandwidth of the streams. pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, } impl BandwidthSinks { - /// Returns the total number of bytes that have been downloaded on all the connections spawned - /// through the [`BandwidthLogging`]. + /// Returns a new [`BandwidthSinks`]. + pub(crate) fn new() -> Arc { + Arc::new(Self { + inbound: AtomicU64::new(0), + outbound: AtomicU64::new(0), + }) + } + + /// Returns the total number of bytes that have been downloaded on all the streams. /// /// > **Note**: This method is by design subject to race conditions. The returned value should /// > only ever be used for statistics purposes. @@ -165,8 +123,7 @@ impl BandwidthSinks { self.inbound.load(Ordering::Relaxed) } - /// Returns the total number of bytes that have been uploaded on all the connections spawned - /// through the [`BandwidthLogging`]. + /// Returns the total number of bytes that have been uploaded on all the streams. /// /// > **Note**: This method is by design subject to race conditions. The returned value should /// > only ever be used for statistics purposes. @@ -175,15 +132,15 @@ impl BandwidthSinks { } } -/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. +/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. #[pin_project::pin_project] -pub struct BandwidthConnecLogging { +pub(crate) struct InstrumentedStream { #[pin] - inner: TInner, + inner: SMInner, sinks: Arc, } -impl AsyncRead for BandwidthConnecLogging { +impl AsyncRead for InstrumentedStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -213,7 +170,7 @@ impl AsyncRead for BandwidthConnecLogging { } } -impl AsyncWrite for BandwidthConnecLogging { +impl AsyncWrite for InstrumentedStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/transport_ext.rs b/src/transport_ext.rs index fa8926c8380..2a4c30f17e3 100644 --- a/src/transport_ext.rs +++ b/src/transport_ext.rs @@ -20,22 +20,74 @@ //! Provides the `TransportExt` trait. -use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport}; +use crate::core::{ + muxing::{StreamMuxer, StreamMuxerBox}, + transport::Boxed, + PeerId, +}; +use crate::{ + bandwidth::{BandwidthLogging, BandwidthSinks}, + Transport, +}; use std::sync::Arc; /// Trait automatically implemented on all objects that implement `Transport`. Provides some /// additional utilities. pub trait TransportExt: Transport { - /// Adds a layer on the `Transport` that logs all trafic that passes through the sockets + /// Adds a layer on the `Transport` that logs all trafic that passes through the streams /// created by it. /// - /// This method returns an `Arc` that can be used to retreive the total number - /// of bytes transferred through the sockets. - fn with_bandwidth_logging(self) -> (BandwidthLogging, Arc) + /// This method returns an `Arc` that can be used to retrieve the total number + /// of bytes transferred through the streams. + /// + /// # Example + /// + /// ``` + /// use libp2p_mplex as mplex; + /// use libp2p_noise as noise; + /// use libp2p_tcp as tcp; + /// use libp2p::{ + /// core::upgrade, + /// identity, + /// TransportExt, + /// Transport, + /// }; + /// + /// let id_keys = identity::Keypair::generate_ed25519(); + /// + /// let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) + /// .upgrade(upgrade::Version::V1) + /// .authenticate( + /// noise::NoiseAuthenticated::xx(&id_keys) + /// .expect("Signing libp2p-noise static DH keypair failed."), + /// ) + /// .multiplex(mplex::MplexConfig::new()) + /// .boxed(); + /// + /// let (transport, sinks) = transport.with_bandwidth_logging(); + /// ``` + fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where - Self: Sized, + Self: Sized + Send + Unpin + 'static, + Self::Dial: Send + 'static, + Self::ListenerUpgrade: Send + 'static, + Self::Error: Send + Sync, + Self::Output: Into<(PeerId, S)>, + S: StreamMuxer + Send + 'static, + S::Substream: Send + 'static, + S::Error: Send + Sync + 'static, { - BandwidthLogging::new(self) + let sinks = BandwidthSinks::new(); + let sinks_copy = sinks.clone(); + let transport = Transport::map(self, |output, _| { + let (peer_id, stream_muxer_box) = output.into(); + ( + peer_id, + StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)), + ) + }) + .boxed(); + (transport, sinks) } }