diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 9d6092ef84e5..f028080d10e5 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -101,6 +101,12 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { + /// Do not request data from the availability store. + /// This is the useful for nodes where the + /// availability-store subsystem is not expected to run, + /// such as collators. + bypass_availability_store: bool, + fast_path: bool, /// Receiver for available data requests. req_receiver: IncomingRequestReceiver, @@ -147,6 +153,9 @@ struct RecoveryParams { /// Metrics to report metrics: Metrics, + + /// Do not request data from availability-store + bypass_availability_store: bool, } /// Source the availability data either by means @@ -467,7 +476,7 @@ impl RequestChunksFromValidators { let metrics = ¶ms.metrics; // First query the store for any chunks we've got. - { + if !params.bypass_availability_store { let (tx, rx) = oneshot::channel(); sender .send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx)) @@ -668,7 +677,7 @@ where { async fn run(mut self) -> Result { // First just see if we have the data available locally. - { + if !self.params.bypass_availability_store { let (tx, rx) = oneshot::channel(); self.sender .send_message(AvailabilityStoreMessage::QueryAvailableData( @@ -856,6 +865,7 @@ async fn launch_recovery_task( receipt: CandidateReceipt, backing_group: Option, response_sender: oneshot::Sender>, + bypass_availability_store: bool, metrics: &Metrics, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -867,6 +877,7 @@ async fn launch_recovery_task( candidate_hash, erasure_root: receipt.descriptor.erasure_root, metrics: metrics.clone(), + bypass_availability_store, }; let phase = backing_group @@ -906,6 +917,7 @@ async fn handle_recover( session_index: SessionIndex, backing_group: Option, response_sender: oneshot::Sender>, + bypass_availability_store: bool, metrics: &Metrics, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -949,6 +961,7 @@ async fn handle_recover( receipt, backing_group, response_sender, + bypass_availability_store, metrics, ) .await, @@ -977,13 +990,22 @@ async fn query_full_data( #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] impl AvailabilityRecoverySubsystem { + /// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the + /// `AvailabilityStoreSubsystem` subsystem. + pub fn with_availability_store_skip( + req_receiver: IncomingRequestReceiver, + metrics: Metrics, + ) -> Self { + Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics } + } + /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to /// request data from backers. pub fn with_fast_path( req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { fast_path: true, req_receiver, metrics } + Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks @@ -991,12 +1013,12 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { fast_path: false, req_receiver, metrics } + Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics } } async fn run(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { fast_path, mut req_receiver, metrics } = self; + let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self; loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); @@ -1025,6 +1047,7 @@ impl AvailabilityRecoverySubsystem { session_index, maybe_backing_group.filter(|_| fast_path), response_sender, + bypass_availability_store, &metrics, ).await { gum::warn!( @@ -1041,6 +1064,14 @@ impl AvailabilityRecoverySubsystem { in_req = recv_req => { match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { + if bypass_availability_store { + gum::debug!( + target: LOG_TARGET, + "Skipping request to availability-store.", + ); + let _ = req.send_response(None.into()); + continue + } match query_full_data(&mut ctx, req.payload.candidate_hash).await { Ok(res) => { let _ = req.send_response(res.into());