Skip to content

Commit

Permalink
feat: log bandwidth on substream instead of socket level (#3180)
Browse files Browse the repository at this point in the history
Previously, the `with_bandwidth_logging` extension to `Transport` would track the bytes sent and received on a socket level. This however only works in conjunction with `Transport` upgrades where a separate multiplexer is negotiated on top of a regular stream.

With QUIC and WebRTC landing, this no longer works as those `Transport`s bring their own multiplexing stack. To still allow for tracking of bandwidth, we refactor the `with_bandwidth_logging` extension to count the bytes send on all substreams opened through a `StreamMuxer`. This works, regardless of the underlying transport technology. It does omit certain layers. However, there isn't necessarily a "correct" layer to count bandwidth on because you can always go down another layer (IP, Ethernet, etc).

Closes #3157.
  • Loading branch information
melekes authored Dec 19, 2022
1 parent 160ddc5 commit 76abab9
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 104 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@

# 0.51.0 [unreleased]

- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)).
- `BandwidthLogging::new` now requires a 2nd argument: `Arc<BandwidthSinks>`
- Remove `BandwidthFuture`
- Rename `BandwidthConnecLogging` to `InstrumentedStream`
- Remove `SimpleProtocol` due to being unused. See [`libp2p::core::upgrade`](https://docs.rs/libp2p/0.50.0/libp2p/core/upgrade/index.html) for alternatives. See [PR 3191].

- Update individual crates.
- Update to [`libp2p-dcutr` `v0.9.0`](protocols/dcutr/CHANGELOG.md#090).

Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ env_logger = "0.10.0"
clap = { version = "4.0.13", features = ["derive"] }
tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] }

libp2p-mplex = { version = "0.38.0", path = "muxers/mplex" }
libp2p-noise = { version = "0.41.0", path = "transports/noise" }
libp2p-tcp = { version = "0.38.0", path = "transports/tcp", features = ["tokio"] }

[workspace]
members = [
"core",
Expand Down
149 changes: 53 additions & 96 deletions src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
core::{
transport::{TransportError, TransportEvent},
Transport,
},
Multiaddr,
};
use crate::core::muxing::{StreamMuxer, StreamMuxerEvent};

use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
ready,
};
use libp2p_core::transport::ListenerId;
use std::{
convert::TryFrom as _,
io,
Expand All @@ -43,130 +36,94 @@ use std::{
task::{Context, Poll},
};

/// Wraps around a `Transport` and counts the number of bytes that go through all the opened
/// connections.
/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened
/// streams.
#[derive(Clone)]
#[pin_project::pin_project]
pub struct BandwidthLogging<TInner> {
pub(crate) struct BandwidthLogging<SMInner> {
#[pin]
inner: TInner,
inner: SMInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner> BandwidthLogging<TInner> {
/// Creates a new [`BandwidthLogging`] around the transport.
pub fn new(inner: TInner) -> (Self, Arc<BandwidthSinks>) {
let sink = Arc::new(BandwidthSinks {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
});

let trans = BandwidthLogging {
inner,
sinks: sink.clone(),
};

(trans, sink)
impl<SMInner> BandwidthLogging<SMInner> {
/// Creates a new [`BandwidthLogging`] around the stream muxer.
pub(crate) fn new(inner: SMInner, sinks: Arc<BandwidthSinks>) -> Self {
Self { inner, sinks }
}
}

impl<TInner> Transport for BandwidthLogging<TInner>
impl<SMInner> StreamMuxer for BandwidthLogging<SMInner>
where
TInner: Transport,
SMInner: StreamMuxer,
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
type ListenerUpgrade = BandwidthFuture<TInner::ListenerUpgrade>;
type Dial = BandwidthFuture<TInner::Dial>;
type Substream = InstrumentedStream<SMInner::Substream>;
type Error = SMInner::Error;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(event) => {
let event = event.map_upgrade({
let sinks = this.sinks.clone();
|inner| BandwidthFuture { inner, sinks }
});
Poll::Ready(event)
}
Poll::Pending => Poll::Pending,
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
this.inner.poll(cx)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
self.inner.remove_listener(id)
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial_as_listener(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.poll_inbound(cx)?);
let logged = InstrumentedStream {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}
}

/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
/// counter.
#[pin_project::pin_project]
pub struct BandwidthFuture<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner: TryFuture> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.try_poll(cx)?);
let logged = BandwidthConnecLogging {
let inner = ready!(this.inner.poll_outbound(cx)?);
let logged = InstrumentedStream {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_close(cx)
}
}

/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`].
/// Allows obtaining the average bandwidth of the streams.
pub struct BandwidthSinks {
inbound: AtomicU64,
outbound: AtomicU64,
}

impl BandwidthSinks {
/// Returns the total number of bytes that have been downloaded on all the connections spawned
/// through the [`BandwidthLogging`].
/// Returns a new [`BandwidthSinks`].
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
})
}

/// Returns the total number of bytes that have been downloaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
pub fn total_inbound(&self) -> u64 {
self.inbound.load(Ordering::Relaxed)
}

/// Returns the total number of bytes that have been uploaded on all the connections spawned
/// through the [`BandwidthLogging`].
/// Returns the total number of bytes that have been uploaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
Expand All @@ -175,15 +132,15 @@ impl BandwidthSinks {
}
}

/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it.
/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it.
#[pin_project::pin_project]
pub struct BandwidthConnecLogging<TInner> {
pub(crate) struct InstrumentedStream<SMInner> {
#[pin]
inner: TInner,
inner: SMInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
impl<SMInner: AsyncRead> AsyncRead for InstrumentedStream<SMInner> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -213,7 +170,7 @@ impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
}
}

impl<TInner: AsyncWrite> AsyncWrite for BandwidthConnecLogging<TInner> {
impl<SMInner: AsyncWrite> AsyncWrite for InstrumentedStream<SMInner> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
66 changes: 59 additions & 7 deletions src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,74 @@

//! Provides the `TransportExt` trait.

use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport};
use crate::core::{
muxing::{StreamMuxer, StreamMuxerBox},
transport::Boxed,
PeerId,
};
use crate::{
bandwidth::{BandwidthLogging, BandwidthSinks},
Transport,
};
use std::sync::Arc;

/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
pub trait TransportExt: Transport {
/// Adds a layer on the `Transport` that logs all trafic that passes through the sockets
/// Adds a layer on the `Transport` that logs all trafic that passes through the streams
/// created by it.
///
/// This method returns an `Arc<BandwidthSinks>` that can be used to retreive the total number
/// of bytes transferred through the sockets.
fn with_bandwidth_logging(self) -> (BandwidthLogging<Self>, Arc<BandwidthSinks>)
/// This method returns an `Arc<BandwidthSinks>` that can be used to retrieve the total number
/// of bytes transferred through the streams.
///
/// # Example
///
/// ```
/// use libp2p_mplex as mplex;
/// use libp2p_noise as noise;
/// use libp2p_tcp as tcp;
/// use libp2p::{
/// core::upgrade,
/// identity,
/// TransportExt,
/// Transport,
/// };
///
/// let id_keys = identity::Keypair::generate_ed25519();
///
/// let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
/// .upgrade(upgrade::Version::V1)
/// .authenticate(
/// noise::NoiseAuthenticated::xx(&id_keys)
/// .expect("Signing libp2p-noise static DH keypair failed."),
/// )
/// .multiplex(mplex::MplexConfig::new())
/// .boxed();
///
/// let (transport, sinks) = transport.with_bandwidth_logging();
/// ```
fn with_bandwidth_logging<S>(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>)
where
Self: Sized,
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
Self::Output: Into<(PeerId, S)>,
S: StreamMuxer + Send + 'static,
S::Substream: Send + 'static,
S::Error: Send + Sync + 'static,
{
BandwidthLogging::new(self)
let sinks = BandwidthSinks::new();
let sinks_copy = sinks.clone();
let transport = Transport::map(self, |output, _| {
let (peer_id, stream_muxer_box) = output.into();
(
peer_id,
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
)
})
.boxed();
(transport, sinks)
}
}

Expand Down

0 comments on commit 76abab9

Please sign in to comment.