Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename downloaders #1108

Merged
merged 5 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Command {
) -> reth_downloaders::headers::task::TaskDownloader {
let headers_conf = &config.stages.headers;
headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
headers::reverse_headers::ReverseHeadersDownloaderBuilder::default()
.request_limit(headers_conf.downloader_batch_size)
.stream_batch_size(headers_conf.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
Expand All @@ -253,7 +253,7 @@ impl Command {
) -> reth_downloaders::bodies::task::TaskDownloader {
let bodies_conf = &config.stages.bodies;
bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
bodies::bodies::BodiesDownloaderBuilder::default()
.with_stream_batch_size(bodies_conf.downloader_stream_batch_size)
.with_request_limit(bodies_conf.downloader_request_limit)
.with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses)
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
NetworkOpts,
};
use reth_consensus::beacon::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;

use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Command {
let fetch_client = Arc::new(network.fetch_client().await?);

let mut stage = BodyStage {
downloader: ConcurrentDownloaderBuilder::default()
downloader: BodiesDownloaderBuilder::default()
.with_stream_batch_size(num_blocks as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies";
/// All blocks in a batch are fetched at the same time.
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ConcurrentDownloader<B: BodiesClient, DB> {
pub struct BodiesDownloader<B: BodiesClient, DB> {
/// The bodies client
client: Arc<B>,
/// The consensus client
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct ConcurrentDownloader<B: BodiesClient, DB> {
metrics: DownloaderMetrics,
}

impl<B, DB> ConcurrentDownloader<B, DB>
impl<B, DB> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -241,7 +241,7 @@ where
}
}

impl<B, DB> BodyDownloader for ConcurrentDownloader<B, DB>
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -331,7 +331,7 @@ where
}
}

impl<B, DB> Stream for ConcurrentDownloader<B, DB>
impl<B, DB> Stream for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -452,8 +452,8 @@ impl Ord for OrderedBodiesResponse {
}
}

/// Builder for [ConcurrentDownloader].
pub struct ConcurrentDownloaderBuilder {
/// Builder for [BodiesDownloader].
pub struct BodiesDownloaderBuilder {
/// The batch size of non-empty blocks per one request
request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
Expand All @@ -464,7 +464,7 @@ pub struct ConcurrentDownloaderBuilder {
concurrent_requests_range: RangeInclusive<usize>,
}

impl Default for ConcurrentDownloaderBuilder {
impl Default for BodiesDownloaderBuilder {
fn default() -> Self {
Self {
request_limit: 200,
Expand All @@ -475,7 +475,7 @@ impl Default for ConcurrentDownloaderBuilder {
}
}

impl ConcurrentDownloaderBuilder {
impl BodiesDownloaderBuilder {
/// Set request batch size on the downloader.
pub fn with_request_limit(mut self, request_limit: u64) -> Self {
self.request_limit = request_limit;
Expand Down Expand Up @@ -509,7 +509,7 @@ impl ConcurrentDownloaderBuilder {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
db: Arc<DB>,
) -> ConcurrentDownloader<B, DB>
) -> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand All @@ -522,7 +522,7 @@ impl ConcurrentDownloaderBuilder {
} = self;
let metrics = DownloaderMetrics::new(BODIES_DOWNLOADER_SCOPE);
let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
ConcurrentDownloader {
BodiesDownloader {
client,
consensus,
db,
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let mut downloader = ConcurrentDownloaderBuilder::default().build(
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let mut downloader = ConcurrentDownloaderBuilder::default()
let mut downloader = BodiesDownloaderBuilder::default()
.with_stream_batch_size(stream_batch_size)
.with_request_limit(request_limit)
.build(client.clone(), Arc::new(TestConsensus::default()), db);
Expand Down Expand Up @@ -631,9 +631,11 @@ mod tests {
insert_headers(&db, &headers);

let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let mut downloader = ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(100)
.build(client.clone(), Arc::new(TestConsensus::default()), db);
let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
);

// Set and download the first range
downloader.set_download_range(0..100).expect("failed to set download range");
Expand Down
2 changes: 1 addition & 1 deletion crates/net/downloaders/src/bodies/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A naive concurrent downloader.
pub mod concurrent;
pub mod bodies;

/// TODO:
pub mod task;
Expand Down
10 changes: 5 additions & 5 deletions crates/net/downloaders/src/bodies/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ impl TaskDownloader {
///
/// ```
/// use std::sync::Arc;
/// use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
/// use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
/// use reth_downloaders::bodies::task::TaskDownloader;
/// use reth_interfaces::consensus::Consensus;
/// use reth_interfaces::p2p::bodies::client::BodiesClient;
/// use reth_db::database::Database;
/// fn t<B: BodiesClient + 'static, DB: Database + 'static>(client: Arc<B>, consensus:Arc<dyn Consensus>, db: Arc<DB>) {
/// let downloader = ConcurrentDownloaderBuilder::default().build(
/// let downloader = BodiesDownloaderBuilder::default().build(
/// client,
/// consensus,
/// db
Expand Down Expand Up @@ -133,7 +133,7 @@ mod tests {
use super::*;
use crate::{
bodies::{
concurrent::ConcurrentDownloaderBuilder,
bodies::BodiesDownloaderBuilder,
test_utils::{insert_headers, zip_blocks},
},
test_utils::{generate_bodies, TestBodiesClient},
Expand All @@ -155,7 +155,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let downloader = ConcurrentDownloaderBuilder::default().build(
let downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let downloader = ConcurrentDownloaderBuilder::default().build(
let downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down
2 changes: 1 addition & 1 deletion crates/net/downloaders/src/headers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A Linear downloader implementation.
pub mod linear;
pub mod reverse_headers;

/// A downloader implementation that spawns a downloader to a task
pub mod task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers";
/// the batches of headers that this downloader yields will start at the chain tip and move towards
/// the local head: falling block numbers.
#[must_use = "Stream does nothing unless polled"]
pub struct LinearDownloader<H: HeadersClient> {
pub struct ReverseHeadersDownloader<H: HeadersClient> {
/// Consensus client used to validate headers
consensus: Arc<dyn Consensus>,
/// Client used to download headers.
Expand Down Expand Up @@ -86,15 +86,15 @@ pub struct LinearDownloader<H: HeadersClient> {
metrics: DownloaderMetrics,
}

// === impl LinearDownloader ===
// === impl ReverseHeadersDownloader ===

impl<H> LinearDownloader<H>
impl<H> ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
/// Convenience method to create a [LinearDownloadBuilder] without importing it
pub fn builder() -> LinearDownloadBuilder {
LinearDownloadBuilder::default()
/// Convenience method to create a [ReverseHeadersDownloaderBuilder] without importing it
pub fn builder() -> ReverseHeadersDownloaderBuilder {
ReverseHeadersDownloaderBuilder::default()
}

/// Returns the block number the local node is at.
Expand Down Expand Up @@ -501,7 +501,7 @@ where
}
}

impl<H> HeaderDownloader for LinearDownloader<H>
impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand Down Expand Up @@ -577,7 +577,7 @@ where
}
}

impl<H> Stream for LinearDownloader<H>
impl<H> Stream for ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand Down Expand Up @@ -793,10 +793,10 @@ impl SyncTargetBlock {
}
}

/// The builder for [LinearDownloader] with
/// The builder for [ReverseHeadersDownloader] with
/// some default settings
#[derive(Debug)]
pub struct LinearDownloadBuilder {
pub struct ReverseHeadersDownloaderBuilder {
/// The batch size per one request
request_limit: u64,
/// Batch size for headers
Expand All @@ -809,7 +809,7 @@ pub struct LinearDownloadBuilder {
max_buffered_responses: usize,
}

impl Default for LinearDownloadBuilder {
impl Default for ReverseHeadersDownloaderBuilder {
fn default() -> Self {
Self {
request_limit: 1_000,
Expand All @@ -821,7 +821,7 @@ impl Default for LinearDownloadBuilder {
}
}

impl LinearDownloadBuilder {
impl ReverseHeadersDownloaderBuilder {
/// Set the request batch size.
///
/// This determines the `limit` for a [GetHeaders](reth_eth_wire::GetBlockHeaders) requests, the
Expand All @@ -833,17 +833,18 @@ impl LinearDownloadBuilder {

/// Set the stream batch size
///
/// This determines the number of headers the [LinearDownloader] will yield on `Stream::next`.
/// This will be the amount of headers the headers stage will commit at a time.
/// This determines the number of headers the [ReverseHeadersDownloader] will yield on
/// `Stream::next`. This will be the amount of headers the headers stage will commit at a
/// time.
pub fn stream_batch_size(mut self, size: usize) -> Self {
self.stream_batch_size = size;
self
}

/// Set the min amount of concurrent requests.
///
/// If there's capacity the [LinearDownloader] will keep at least this many requests active at a
/// time.
/// If there's capacity the [ReverseHeadersDownloader] will keep at least this many requests
/// active at a time.
pub fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
self.min_concurrent_requests = min_concurrent_requests;
self
Expand All @@ -861,16 +862,20 @@ impl LinearDownloadBuilder {
///
/// This essentially determines how much memory the downloader can use for buffering responses
/// that arrive out of order. The total number of buffered headers is `request_limit *
/// max_buffered_responses`. If the [LinearDownloader]'s buffered responses exceeds this
/// max_buffered_responses`. If the [ReverseHeadersDownloader]'s buffered responses exceeds this
/// threshold it waits until there's capacity again before sending new requests.
pub fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
self.max_buffered_responses = max_buffered_responses;
self
}

/// Build [LinearDownloader] with provided consensus
/// Build [ReverseHeadersDownloader] with provided consensus
/// and header client implementations
pub fn build<H>(self, consensus: Arc<dyn Consensus>, client: Arc<H>) -> LinearDownloader<H>
pub fn build<H>(
self,
consensus: Arc<dyn Consensus>,
client: Arc<H>,
) -> ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand All @@ -881,7 +886,7 @@ impl LinearDownloadBuilder {
max_concurrent_requests,
max_buffered_responses,
} = self;
LinearDownloader {
ReverseHeadersDownloader {
consensus,
client,
local_head: None,
Expand Down Expand Up @@ -940,7 +945,7 @@ mod tests {

let genesis = SealedHeader::default();

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
Expand Down Expand Up @@ -968,7 +973,7 @@ mod tests {

let header = SealedHeader::default();

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(header.clone());
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
Expand Down Expand Up @@ -1008,7 +1013,7 @@ mod tests {

let batch_size = 99;
let start = 1000;
let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.request_limit(batch_size)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
Expand Down Expand Up @@ -1057,7 +1062,7 @@ mod tests {
let p1 = child_header(&p2);
let p0 = child_header(&p1);

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
.request_limit(3)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Expand Down Expand Up @@ -1089,7 +1094,7 @@ mod tests {
let p0 = child_header(&p1);

let client = Arc::new(TestHeadersClient::default());
let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(1)
.request_limit(1)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Expand Down
Loading