Skip to content

Commit

Permalink
feat: track total size of buffered blocks (#2838)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed May 25, 2023
1 parent 6d0d210 commit ba4776a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
10 changes: 10 additions & 0 deletions crates/interfaces/src/p2p/bodies/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ impl BlockResponse {
}
}

/// Returns the total number of bytes of all transactions input data in the block
pub fn size(&self) -> usize {
match self {
BlockResponse::Full(block) => {
block.body.iter().map(|tx| tx.transaction.input().len()).sum()
}
BlockResponse::Empty(_) => 0,
}
}

/// Return the block number
pub fn block_number(&self) -> BlockNumber {
self.header().number
Expand Down
33 changes: 27 additions & 6 deletions crates/net/downloaders/src/bodies/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ where
self.metrics.in_flight_requests.set(0.);
self.metrics.buffered_responses.set(0.);
self.metrics.buffered_blocks.set(0.);
self.metrics.buffered_blocks_size_bytes.set(0.);
self.metrics.queued_blocks.set(0.);
}

Expand All @@ -221,18 +222,21 @@ where
fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse> {
let resp = self.buffered_responses.pop()?;
self.metrics.buffered_responses.decrement(1.);
self.num_buffered_blocks -= resp.0.len();
self.num_buffered_blocks -= resp.len();
self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64);
self.metrics.buffered_blocks_size_bytes.decrement(resp.size() as f64);
Some(resp)
}

/// Adds a new response to the internal buffer
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse>) {
self.num_buffered_blocks += response.len();
self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64);
let response = OrderedBodiesResponse(response);
let size = response.iter().map(|b| b.size()).sum::<usize>();
let response = OrderedBodiesResponse { resp: response, size };
self.buffered_responses.push(response);
self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
self.metrics.buffered_blocks_size_bytes.increment(size as f64);
}

/// Returns a response if it's first block number matches the next expected.
Expand All @@ -244,7 +248,7 @@ where
if next_block_range.contains(&expected) {
return self.pop_buffered_response().map(|buffered| {
buffered
.0
.resp
.into_iter()
.skip_while(|b| b.block_number() < expected)
.take_while(|b| self.download_range.contains(&b.block_number()))
Expand Down Expand Up @@ -425,23 +429,40 @@ where
}

#[derive(Debug)]
struct OrderedBodiesResponse(Vec<BlockResponse>);
struct OrderedBodiesResponse {
resp: Vec<BlockResponse>,
/// The total size of the response in bytes
size: usize,
}

impl OrderedBodiesResponse {
/// Returns the block number of the first element
///
/// # Panics
/// If the response vec is empty.
fn first_block_number(&self) -> u64 {
self.0.first().expect("is not empty").block_number()
self.resp.first().expect("is not empty").block_number()
}

/// Returns the range of the block numbers in the response
///
/// # Panics
/// If the response vec is empty.
fn block_range(&self) -> RangeInclusive<u64> {
self.first_block_number()..=self.0.last().expect("is not empty").block_number()
self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
}

#[inline]
fn len(&self) -> usize {
self.resp.len()
}

/// Returns the size of the response in bytes
///
/// See [BlockResponse::size]
#[inline]
fn size(&self) -> usize {
self.size
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/net/downloaders/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct DownloaderMetrics {
/// These are bodies that have been received, but not cannot be committed yet because they're
/// not contiguous
pub buffered_blocks: Gauge,
/// Total amount of memory used by the buffered blocks in bytes
pub buffered_blocks_size_bytes: Gauge,
/// The number blocks that are contiguous and are queued for insertion into the db.
pub queued_blocks: Gauge,
/// The number of out-of-order requests sent by the downloader.
Expand Down

0 comments on commit ba4776a

Please sign in to comment.