Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add option to skip av-store requests in availability-recovery-subsystem #7131

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,

fast_path: bool,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
Expand Down Expand Up @@ -147,6 +153,9 @@ struct RecoveryParams {

/// Metrics to report
metrics: Metrics,

/// Do not request data from availability-store
bypass_availability_store: bool,
}

/// Source the availability data either by means
Expand Down Expand Up @@ -384,6 +393,7 @@ impl RequestChunksFromValidators {
metrics.on_chunk_request_succeeded();
gum::trace!(
target: LOG_TARGET,

skunert marked this conversation as resolved.
Show resolved Hide resolved
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Received valid chunk",
Expand Down Expand Up @@ -467,7 +477,7 @@ impl RequestChunksFromValidators {
let metrics = &params.metrics;

// First query the store for any chunks we've got.
{
if !params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
Expand Down Expand Up @@ -668,7 +678,7 @@ where
{
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
{
if !self.params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
self.sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(
Expand Down Expand Up @@ -856,6 +866,7 @@ async fn launch_recovery_task<Context>(
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
Expand All @@ -867,6 +878,7 @@ async fn launch_recovery_task<Context>(
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store,
};

let phase = backing_group
Expand Down Expand Up @@ -906,6 +918,7 @@ async fn handle_recover<Context>(
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
Expand Down Expand Up @@ -949,6 +962,7 @@ async fn handle_recover<Context>(
receipt,
backing_group,
response_sender,
bypass_availability_store,
metrics,
)
.await,
Expand Down Expand Up @@ -977,26 +991,35 @@ async fn query_full_data<Context>(

#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
/// `AvailabilityStoreSubsystem` subsystem.
pub fn with_availability_store_skip(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
/// request data from backers.
pub fn with_fast_path(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, req_receiver, metrics }
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, req_receiver, metrics }
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { fast_path, mut req_receiver, metrics } = self;
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
Expand Down Expand Up @@ -1025,6 +1048,7 @@ impl AvailabilityRecoverySubsystem {
session_index,
maybe_backing_group.filter(|_| fast_path),
response_sender,
bypass_availability_store,
&metrics,
).await {
gum::warn!(
Expand All @@ -1041,6 +1065,14 @@ impl AvailabilityRecoverySubsystem {
in_req = recv_req => {
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
Ok(req) => {
if bypass_availability_store {
gum::debug!(
target: LOG_TARGET,
"Skipping request to availability-store.",
);
let _ = req.send_response(None.into());
continue
}
match query_full_data(&mut ctx, req.payload.candidate_hash).await {
Ok(res) => {
let _ = req.send_response(res.into());
Expand Down