Skip to content

Commit

Permalink
feat: use MeteredSender for ActiveSession Sender Half (paradigmxyz#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
leruaa committed Feb 3, 2023
1 parent a55e7fd commit 13eea35
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions crates/metrics/common/src/metered_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::mpsc::{
};

/// Network throughput metrics
#[derive(Metrics)]
#[derive(Clone, Metrics)]
#[metrics(dynamic = true)]
struct MeteredSenderMetrics {
/// Number of messages sent
Expand All @@ -19,6 +19,7 @@ struct MeteredSenderMetrics {
}

/// Manages updating the network throughput metrics for a metered stream
#[derive(Debug)]
pub struct MeteredSender<T> {
/// The [`Sender`] that this wraps around
sender: Sender<T>,
Expand All @@ -34,7 +35,7 @@ impl<T> MeteredSender<T> {

/// Calls the underlying [`Sender`]'s `try_send`, incrementing the appropriate
/// metrics depending on the result.
pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.sender.try_send(message) {
Ok(()) => {
self.metrics.messages_sent.increment(1);
Expand Down Expand Up @@ -62,3 +63,9 @@ impl<T> MeteredSender<T> {
}
}
}

impl<T> Clone for MeteredSender<T> {
fn clone(&self) -> Self {
Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
}
}
1 change: 1 addition & 0 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ reth-rlp-derive = { path = "../../rlp/rlp-derive" }
reth-tasks = { path = "../../tasks" }
reth-transaction-pool = { path = "../../transaction-pool" }
reth-provider = { path = "../../storage/provider"}
reth-metrics-common = { path = "../../metrics/common" }

# async/futures
futures = "0.3"
Expand Down
8 changes: 6 additions & 2 deletions crates/net/network/src/session/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_eth_wire::{
DisconnectReason, EthMessage, EthStream, P2PStream,
};
use reth_interfaces::p2p::error::RequestError;
use reth_metrics_common::metered_sender::MeteredSender;
use reth_net_common::bandwidth_meter::MeteredStream;
use reth_primitives::PeerId;
use std::{
Expand Down Expand Up @@ -74,7 +75,7 @@ pub(crate) struct ActiveSession {
/// Incoming commands from the manager
pub(crate) commands_rx: ReceiverStream<SessionCommand>,
/// Sink to send messages to the [`SessionManager`](super::SessionManager).
pub(crate) to_session: mpsc::Sender<ActiveSessionMessage>,
pub(crate) to_session: MeteredSender<ActiveSessionMessage>,
/// Incoming request to send to delegate to the remote peer.
pub(crate) request_tx: Fuse<ReceiverStream<PeerRequest>>,
/// All requests sent to the remote peer we're waiting on a response
Expand Down Expand Up @@ -744,7 +745,10 @@ mod tests {
remote_capabilities: Arc::clone(&capabilities),
session_id,
commands_rx: ReceiverStream::new(commands_rx),
to_session: self.active_session_tx.clone(),
to_session: MeteredSender::new(
self.active_session_tx.clone(),
"network_active_session",
),
request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
Expand Down
5 changes: 3 additions & 2 deletions crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_eth_wire::{
errors::EthStreamError,
DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream,
};
use reth_metrics_common::metered_sender::MeteredSender;
use reth_net_common::bandwidth_meter::{BandwidthMeter, MeteredStream};
use reth_primitives::{ForkFilter, ForkId, ForkTransition, PeerId, H256, U256};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -91,7 +92,7 @@ pub(crate) struct SessionManager {
///
/// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
/// clone of this sender half.
active_session_tx: mpsc::Sender<ActiveSessionMessage>,
active_session_tx: MeteredSender<ActiveSessionMessage>,
/// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
active_session_rx: ReceiverStream<ActiveSessionMessage>,
/// Used to measure inbound & outbound bandwidth across all managed streams
Expand Down Expand Up @@ -129,7 +130,7 @@ impl SessionManager {
active_sessions: Default::default(),
pending_sessions_tx,
pending_session_rx: ReceiverStream::new(pending_sessions_rx),
active_session_tx,
active_session_tx: MeteredSender::new(active_session_tx, "network_active_session"),
active_session_rx: ReceiverStream::new(active_session_rx),
bandwidth_meter,
}
Expand Down

0 comments on commit 13eea35

Please sign in to comment.