Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Revert loop prevention (#4472)
Browse files Browse the repository at this point in the history
* Provisioner: Only include and sign bitfields on fresh leaves.
  • Loading branch information
eskimor authored Dec 13, 2021
1 parent c82f4a7 commit 853eaa6
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 47 deletions.
8 changes: 4 additions & 4 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use polkadot_subsystem::{
DisputeCoordinatorMessage, ImportStatementsResult, ProvisionableData, ProvisionerMessage,
RuntimeApiRequest, StatementDistributionMessage, ValidationFailed,
},
overseer, PerLeafSpan, Stage, SubsystemSender,
overseer, ActivatedLeaf, PerLeafSpan, Stage, SubsystemSender,
};
use sp_keystore::SyncCryptoStorePtr;
use statement_table::{
Expand Down Expand Up @@ -1180,13 +1180,13 @@ impl util::JobTrait for CandidateBackingJob {
const NAME: &'static str = "candidate-backing-job";

fn run<S: SubsystemSender>(
parent: Hash,
span: Arc<jaeger::Span>,
leaf: ActivatedLeaf,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let parent = leaf.hash;
async move {
macro_rules! try_runtime_api {
($x: expr) => {
Expand All @@ -1208,7 +1208,7 @@ impl util::JobTrait for CandidateBackingJob {
}
}

let span = PerLeafSpan::new(span, "backing");
let span = PerLeafSpan::new(leaf.span, "backing");
let _span = span.child("runtime-apis");

let (validators, groups, session_index, cores) = futures::try_join!(
Expand Down
26 changes: 17 additions & 9 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use polkadot_node_subsystem::{
AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
PerLeafSpan, SubsystemSender,
ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::{
self as util,
Expand All @@ -43,7 +43,7 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use std::{iter::FromIterator, pin::Pin, sync::Arc, time::Duration};
use std::{iter::FromIterator, pin::Pin, time::Duration};
use wasm_timer::{Delay, Instant};

#[cfg(test)]
Expand Down Expand Up @@ -237,23 +237,31 @@ impl JobTrait for BitfieldSigningJob {

/// Run a job for the parent block indicated
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
leaf: ActivatedLeaf,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
let span = PerLeafSpan::new(span, "bitfield-signing");
if let LeafStatus::Stale = leaf.status {
tracing::debug!(
target: LOG_TARGET,
hash = ?leaf.hash,
block_number = ?leaf.number,
"Stale leaf - don't sign bitfields."
);
return Ok(())
}

let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await
{
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
Expand All @@ -270,7 +278,7 @@ impl JobTrait for BitfieldSigningJob {
let span_availability = span.child("availability");

let bitfield = match construct_availability_bitfield(
relay_parent,
leaf.hash,
&span_availability,
validator.index(),
sender.subsystem_sender(),
Expand Down Expand Up @@ -311,7 +319,7 @@ impl JobTrait for BitfieldSigningJob {

sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(
relay_parent,
leaf.hash,
signed_bitfield,
))
.await;
Expand Down
32 changes: 18 additions & 14 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use polkadot_node_subsystem::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
},
PerLeafSpan, SubsystemSender,
ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores, request_persisted_validation_data, JobSender,
Expand All @@ -43,7 +43,7 @@ use polkadot_primitives::v1::{
DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use std::{collections::BTreeMap, pin::Pin};
use thiserror::Error;

mod metrics;
Expand Down Expand Up @@ -92,7 +92,7 @@ impl InherentAfter {

/// A per-relay-parent job for the provisioning subsystem.
pub struct ProvisioningJob {
relay_parent: Hash,
leaf: ActivatedLeaf,
receiver: mpsc::Receiver<ProvisionerMessage>,
backed_candidates: Vec<CandidateReceipt>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
Expand Down Expand Up @@ -156,15 +156,15 @@ impl JobTrait for ProvisioningJob {
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
leaf: ActivatedLeaf,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = ProvisioningJob::new(relay_parent, metrics, receiver);
let span = leaf.span.clone();
let job = ProvisioningJob::new(leaf, metrics, receiver);

job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner"))
.await
Expand All @@ -175,12 +175,12 @@ impl JobTrait for ProvisioningJob {

impl ProvisioningJob {
fn new(
relay_parent: Hash,
leaf: ActivatedLeaf,
metrics: Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
) -> Self {
Self {
relay_parent,
leaf,
receiver,
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ProvisioningJob {
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
) {
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.leaf,
&self.signed_bitfields,
&self.backed_candidates,
return_senders,
Expand Down Expand Up @@ -291,23 +291,27 @@ type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
async fn send_inherent_data(
relay_parent: Hash,
leaf: &ActivatedLeaf,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut impl SubsystemSender,
metrics: &Metrics,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, from_job)
let availability_cores = request_availability_cores(leaf.hash, from_job)
.await
.await
.map_err(|err| Error::CanceledAvailabilityCores(err))??;

let disputes = select_disputes(from_job, metrics).await?;
let bitfields = select_availability_bitfields(&availability_cores, bitfields);
// Only include bitfields on fresh leaves. On chain reversions, we want to make sure that
// there will be at least one block, which cannot get disputed, so the chain can make progress.
let bitfields = match leaf.status {
LeafStatus::Fresh => select_availability_bitfields(&availability_cores, bitfields),
LeafStatus::Stale => Vec::new(),
};
let candidates =
select_candidates(&availability_cores, &bitfields, candidates, relay_parent, from_job)
.await?;
select_candidates(&availability_cores, &bitfields, candidates, leaf.hash, from_job).await?;

let inherent_data =
ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
Expand Down
7 changes: 6 additions & 1 deletion node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use futures::{
use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, SubsystemContext,
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
};

use super::{Metrics, LOG_TARGET};
Expand Down Expand Up @@ -97,6 +97,11 @@ impl Requester {
{
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
Expand Down
24 changes: 10 additions & 14 deletions node/subsystem-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use polkadot_node_subsystem::{
messages::{
AllMessages, BoundToRelayParent, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
},
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
SubsystemSender,
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem,
SubsystemContext, SubsystemSender,
};

pub use overseer::{
Expand All @@ -48,7 +48,7 @@ use futures::{
};
use parity_scale_codec::Encode;
use pin_project::pin_project;
use polkadot_node_jaeger as jaeger;

use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
Expand All @@ -64,7 +64,6 @@ use std::{
fmt,
marker::Unpin,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -514,8 +513,7 @@ pub trait JobTrait: Unpin + Sized {
///
/// The job should be ended when `receiver` returns `None`.
fn run<S: SubsystemSender>(
parent: Hash,
span: Arc<jaeger::Span>,
leaf: ActivatedLeaf,
run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<Self::ToJob>,
Expand Down Expand Up @@ -563,22 +561,21 @@ where
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job<Job, Sender>(
&mut self,
parent_hash: Hash,
span: Arc<jaeger::Span>,
leaf: ActivatedLeaf,
run_args: Job::RunArgs,
metrics: Job::Metrics,
sender: Sender,
) where
Job: JobTrait<ToJob = ToJob>,
Sender: SubsystemSender,
{
let hash = leaf.hash;
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);

let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(
parent_hash,
span,
leaf,
run_args,
metrics,
to_job_rx,
Expand All @@ -588,7 +585,7 @@ where
{
tracing::error!(
job = Job::NAME,
parent_hash = %parent_hash,
parent_hash = %hash,
err = ?e,
"job finished with an error",
);
Expand All @@ -608,7 +605,7 @@ where

let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx };

self.running.insert(parent_hash, handle);
self.running.insert(hash, handle);
}

/// Stop the job associated with this `parent_hash`.
Expand Down Expand Up @@ -710,8 +707,7 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
for activated in activated {
let sender = ctx.sender().clone();
jobs.spawn_job::<Job, _>(
activated.hash,
activated.span,
activated,
run_args.clone(),
metrics.clone(),
sender,
Expand Down
3 changes: 1 addition & 2 deletions node/subsystem-util/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ impl JobTrait for FakeCollatorProtocolJob {
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
_: ActivatedLeaf,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ In particular this subsystem is responsible for:
- Respond to network requests requesting availability data by querying the
[Availability Store](../utility/availability-store.md).
- Request chunks from backing validators to put them in the local `Availability
Store` whenever we find an occupied core on the chain,
Store` whenever we find an occupied core on any fresh leaf,
this is to ensure availability by at least 2/3+ of all validators, this
happens after a candidate is backed.
- Fetch `PoV` from validators, when requested via `FetchPoV` message from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Upon receipt of an `ActiveLeavesUpdate`, launch bitfield signing job for each `a
Localized to a specific relay-parent `r`
If not running as a validator, do nothing.

- Begin by waiting a fixed period of time so availability distribution has the chance to make candidates available.
- For each fresh leaf, begin by waiting a fixed period of time so availability distribution has the chance to make candidates available.
- Determine our validator index `i`, the set of backed candidates pending availability in `r`, and which bit of the bitfield each corresponds to.
- Start with an empty bitfield. For each bit in the bitfield, if there is a candidate pending availability, query the [Availability Store](../utility/availability-store.md) for whether we have the availability chunk for our validator index. The `OccupiedCore` struct contains the candidate hash so the full candidate does not need to be fetched from runtime.
- For all chunks we have, set the corresponding bit in the bitfield.
Expand Down
2 changes: 1 addition & 1 deletion roadmap/implementers-guide/src/node/utility/provisioner.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The block author can choose 0 or 1 backed parachain candidates per parachain; th

### Signed Bitfields

[Signed bitfields](../../types/availability.md#signed-availability-bitfield) are attestations from a particular validator about which candidates it believes are available.
[Signed bitfields](../../types/availability.md#signed-availability-bitfield) are attestations from a particular validator about which candidates it believes are available. Those will only be provided on fresh leaves.

### Misbehavior Reports

Expand Down

0 comments on commit 853eaa6

Please sign in to comment.