Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Draft: Block relay POC (proposal 2) #46

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
block_relay: None,
})?;

if config.offchain_worker.enabled {
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ pub fn new_full_base(
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
block_relay: None,
})?;

if config.offchain_worker.enabled {
Expand Down
56 changes: 56 additions & 0 deletions client/network/sync/src/block_relay_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! Block relay protocol related definitions.

use crate::service::network::NetworkServiceHandle;
use futures::channel::oneshot;
use libp2p::PeerId;
use sc_network_common::request_responses::{ProtocolConfig, RequestFailure};
use sp_runtime::traits::{Block as BlockT};
use std::sync::Arc;

/// The serving side of the block relay protocol. It runs a single instance
/// of the server task that processes the incoming protocol messages.
#[async_trait::async_trait]
pub trait BlockServer<Block: BlockT>: Send {
/// Starts the protocol processing.
async fn run(&mut self);
}

/// The client side stub to download blocks from peers. This is a handle that can
/// be used to initiate concurrent downloads.
#[async_trait::async_trait]
pub trait BlockDownloader: Send + Sync {
/// Performs the protocol specific sequence to fetch the block from the peer.
/// Input: request is serialized schema::v1::BlockRequest.
/// Output: if the protocol succeeds, serialized schema::v1::BlockResponse is returned.
async fn download_block(
&self,
who: PeerId,
request: Vec<u8>,
network: NetworkServiceHandle,
) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>;
}

/// Block relay specific params for network creation.
pub struct BlockRelayParams<Block: BlockT> {
pub server: Box<dyn BlockServer<Block>>,
pub downloader: Arc<dyn BlockDownloader>,
pub request_response_config: ProtocolConfig,
}


68 changes: 63 additions & 5 deletions client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
//! Helper for handling (i.e. answering) block requests from a remote peer via the
//! `crate::request_responses::RequestResponsesBehaviour`.

use crate::block_relay_protocol::{BlockServer, BlockDownloader, BlockRelayParams};
use crate::service::network::NetworkServiceHandle;
use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction};
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use libp2p::PeerId;
use log::debug;
use log::{debug, info};
use lru::LruCache;
use prost::Message;
use sc_client_api::BlockBackend;
use sc_network_common::{
config::ProtocolId,
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
protocol::ProtocolName,
request_responses::{
IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure
},
sync::message::BlockAttributes,
};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -149,7 +154,7 @@ where
fork_id: Option<&str>,
client: Arc<Client>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
) -> BlockRelayParams<B> {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
Expand All @@ -169,11 +174,20 @@ where
NonZeroUsize::new(num_peer_hint.max(1) * 2).expect("cache capacity is not zero");
let seen_requests = LruCache::new(capacity);

(Self { client, request_receiver, seen_requests }, protocol_config)
BlockRelayParams {
server: Box::new(Self { client, request_receiver, seen_requests }),
downloader: Arc::new(FullBlockDownloader::new(protocol_config.name.clone())),
request_response_config: protocol_config,

}
}

/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
async fn process_requests(&mut self) {
info!(
target: LOG_TARGET,
"BlockRequestHandler::process_requests(): started"
);
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;

Expand Down Expand Up @@ -447,6 +461,17 @@ where
}
}

#[async_trait::async_trait]
impl<B, Client> BlockServer<B> for BlockRequestHandler<B, Client>
where
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
{
async fn run(&mut self) {
self.process_requests().await;
}
}

#[derive(Debug, thiserror::Error)]
enum HandleRequestError {
#[error("Failed to decode request: {0}.")]
Expand All @@ -464,3 +489,36 @@ enum HandleRequestError {
#[error("Failed to send response.")]
SendResponse,
}

pub struct FullBlockDownloader {
protocol_name: ProtocolName,
}

impl FullBlockDownloader {
fn new(protocol_name: ProtocolName) -> Self {
Self {
protocol_name
}
}
}

/// The full block download implementation.
#[async_trait::async_trait]
impl BlockDownloader for FullBlockDownloader {
async fn download_block(
&self,
who: PeerId,
request: Vec<u8>,
network: NetworkServiceHandle,
) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
network.start_request(
who,
self.protocol_name.clone(),
request,
tx,
IfDisconnected::ImmediateError,
);
rx.await
}
}
35 changes: 17 additions & 18 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! the network, or whenever a block has been successfully verified, call the appropriate method in
//! order to update it.

pub mod block_relay_protocol;
pub mod block_request_handler;
pub mod blocks;
pub mod mock;
Expand All @@ -42,6 +43,7 @@ pub mod warp_request_handler;

use crate::{
blocks::BlockCollection,
block_relay_protocol::BlockDownloader,
schema::v1::{StateRequest, StateResponse},
service::chain_sync::{ChainSyncInterfaceHandle, ToServiceCommand},
state::StateSync,
Expand Down Expand Up @@ -333,8 +335,8 @@ pub struct ChainSync<B: BlockT, Client> {
network_service: service::network::NetworkServiceHandle,
/// Protocol name used for block announcements
block_announce_protocol_name: ProtocolName,
/// Protocol name used to send out block requests
block_request_protocol_name: ProtocolName,
/// Block downloader stub
block_downloader: Arc<dyn BlockDownloader>,
/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,
/// Protocol name used to send out warp sync requests
Expand Down Expand Up @@ -1402,23 +1404,20 @@ where
}

fn send_block_request(&mut self, who: PeerId, request: BlockRequest<B>) {
let (tx, rx) = oneshot::channel();
let opaque_req = self.create_opaque_block_request(&request);

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
}

match self.encode_block_request(&opaque_req) {
Ok(data) => {
self.network_service.start_request(
who,
self.block_request_protocol_name.clone(),
data,
tx,
IfDisconnected::ImmediateError,
);
// Send the request even if the peer is not known. This would cause
// a failure to be returned. This simulates the existing behavior.
let network = self.network_service.clone();
let downloader = self.block_downloader.clone();
self.pending_responses.push(Box::pin(async move {
(
who,
PeerRequest::Block(request),
downloader.download_block(who, data, network).await
)
}));
},
Err(err) => {
log::warn!(
Expand Down Expand Up @@ -1456,7 +1455,7 @@ where
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_request_protocol_name: ProtocolName,
block_downloader: Arc<dyn BlockDownloader>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
force_synced: bool,
Expand Down Expand Up @@ -1498,7 +1497,7 @@ where
gap_sync: None,
service_rx,
network_service,
block_request_protocol_name,
block_downloader,
state_request_protocol_name,
warp_sync_params,
warp_sync_protocol_name,
Expand Down
14 changes: 6 additions & 8 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,12 +847,10 @@ where

let fork_id = Some(String::from("test-fork-id"));

let block_request_protocol_config = {
let (handler, protocol_config) =
BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
self.spawn_task(handler.run().boxed());
protocol_config
};
let mut block_relay_params = BlockRequestHandler::new(&protocol_id, None, client.clone(), 50);
self.spawn_task(Box::pin(async move {
block_relay_params.server.run().await;
}));

let state_request_protocol_config = {
let (handler, protocol_config) =
Expand Down Expand Up @@ -920,7 +918,7 @@ where
None,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
block_relay_params.downloader,
state_request_protocol_config.name.clone(),
Some(warp_protocol_config.name.clone()),
network_config.force_synced,
Expand All @@ -941,7 +939,7 @@ where
metrics_registry: None,
block_announce_config,
request_response_protocol_configs: [
block_request_protocol_config,
block_relay_params.request_response_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_protocol_config,
Expand Down
34 changes: 22 additions & 12 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use sc_network_common::{
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_relay_protocol::BlockRelayParams,
block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider,
state_request_handler::StateRequestHandler,
warp_request_handler::RequestHandler as WarpSyncRequestHandler, ChainSync,
Expand Down Expand Up @@ -761,6 +762,9 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
Option<Box<dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send>>,
/// Optional warp sync params.
pub warp_sync_params: Option<WarpSyncParams<TBl>>,
/// User specified block relay params. If not specified, the default
/// block request handler will be used.
pub block_relay: Option<BlockRelayParams<TBl>>,
}
/// Build the network service, the network status sinks and an RPC sender.
pub fn build_network<TBl, TExPool, TImpQu, TCl>(
Expand Down Expand Up @@ -796,6 +800,7 @@ where
import_queue,
block_announce_validator_builder,
warp_sync_params,
block_relay,
} = params;

let mut request_response_protocol_configs = Vec::new();
Expand All @@ -820,18 +825,23 @@ where
Box::new(DefaultBlockAnnounceValidator)
};

let block_request_protocol_config = {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay {
Some(params) => (params.server, params.downloader, params.request_response_config),
None => {
// Custom protocol was not specified, use the default block handler.
let params = BlockRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
(params.server, params.downloader, params.request_response_config)
}
};
spawn_handle.spawn("block-request-handler", Some("networking"), async move {
block_server.run().await;
});

let state_request_protocol_config = {
// Allow both outgoing and incoming requests.
Expand Down Expand Up @@ -893,7 +903,7 @@ where
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
block_downloader,
state_request_protocol_config.name.clone(),
warp_sync_protocol_config.as_ref().map(|config| config.name.clone()),
config.network.force_synced,
Expand Down