Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Apply suggestions from #1364 code review
Browse files Browse the repository at this point in the history
- use CoreState, not CoreOccupied
- query for availability chunks, not the whole PoV
- create a stub `fn availability_cores`
  • Loading branch information
coriolinus committed Jul 24, 2020
1 parent 06ea025 commit 474c221
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
45 changes: 21 additions & 24 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
},
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreOccupied, Hash};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{convert::TryFrom, pin::Pin, time::Duration};
use wasm_timer::{Delay, Instant};

Expand Down Expand Up @@ -127,15 +127,17 @@ pub enum Error {
Multiple(Vec<Error>),
}

// this function exists mainly to collect a bunch of potential error points into one.
// if there is a candidate pending availability, query the Availability Store
// for whether we have the availability chunk for our validator index.
async fn get_core_availability(
relay_parent: Hash,
idx: usize,
core: Option<CoreOccupied>,
core: CoreState,
validator_idx: ValidatorIndex,
sender: &mpsc::Sender<FromJob>,
) -> Result<bool, Error> {
use messages::{
AvailabilityStoreMessage::QueryDataAvailability,
AvailabilityStoreMessage::QueryChunkAvailability,
RuntimeApiRequest::CandidatePendingAvailability,
};
use FromJob::{AvailabilityStore, RuntimeApi};
Expand All @@ -144,8 +146,7 @@ async fn get_core_availability(
// we have to (cheaply) clone this sender so we can mutate it to actually send anything
let mut sender = sender.clone();

// REVIEW: is it safe to ignore parathreads here, or do they also figure in the availability mapping?
if let Some(CoreOccupied::Parachain) = core {
if let CoreState::Occupied(_) = core {
let (tx, rx) = oneshot::channel();
sender
.send(RuntimeApi(Request(
Expand All @@ -159,8 +160,9 @@ async fn get_core_availability(
};
let (tx, rx) = oneshot::channel();
sender
.send(AvailabilityStore(QueryDataAvailability(
.send(AvailabilityStore(QueryChunkAvailability(
committed_candidate_receipt.descriptor.pov_hash,
validator_idx,
tx,
)))
.await?;
Expand All @@ -169,6 +171,11 @@ async fn get_core_availability(
Ok(false)
}

// delegates to the v1 runtime API
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
unimplemented!()
}

// the way this function works is not intuitive:
//
// - get the scheduler roster so we have a list of cores, in order.
Expand All @@ -177,29 +184,19 @@ async fn get_core_availability(
// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_)
async fn construct_availability_bitfield(
relay_parent: Hash,
validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJob>,
) -> Result<AvailabilityBitfield, Error> {
use futures::lock::Mutex;

use messages::RuntimeApiRequest::ValidatorGroups;
use FromJob::RuntimeApi;
use RuntimeApiMessage::Request;

// request the validator groups so we can get the scheduler roster
let (tx, rx) = oneshot::channel();
sender
.send(RuntimeApi(Request(relay_parent, ValidatorGroups(tx))))
.await?;
// get the set of availability cores from the runtime
let availability_cores = get_availability_cores(relay_parent, sender).await?;

// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
let sender = &*sender;

// wait for the scheduler roster
let scheduler_roster = rx.await?;

// prepare outputs
let out =
Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; scheduler_roster.availability_cores.len()));
let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
// any need to ever wait to lock this mutex.
Expand All @@ -213,9 +210,9 @@ async fn construct_availability_bitfield(
//
// In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why
// we need the mutexes and explicit references above.
stream::iter(scheduler_roster.availability_cores.into_iter().enumerate())
stream::iter(availability_cores.into_iter().enumerate())
.for_each_concurrent(None, |(idx, core)| async move {
let availability = match get_core_availability(relay_parent, idx, core, sender).await {
let availability = match get_core_availability(relay_parent, idx, core, validator_idx, sender).await {
Ok(availability) => availability,
Err(err) => {
errs_ref.lock().await.push(err);
Expand Down Expand Up @@ -264,7 +261,7 @@ impl JobTrait for BitfieldSigningJob {
// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

let bitfield = construct_availability_bitfield(relay_parent, &mut sender).await?;
let bitfield = construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await?;
let signed_bitfield = validator.sign(bitfield);

// make an anonymous scope to contain some use statements to simplify creating the outbound message
Expand Down
10 changes: 9 additions & 1 deletion node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,21 @@ pub enum AvailabilityStoreMessage {

/// Query whether a `AvailableData` exists within the AV Store.
///
/// This is useful in cases like bitfield signing, when existence
/// This is useful in cases when existence
/// matters, but we don't want to necessarily pass around multiple
/// megabytes of data to get a single bit of information.
QueryDataAvailability(Hash, oneshot::Sender<bool>),

/// Query an `ErasureChunk` from the AV store.
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),

/// Query whether an `ErasureChunk` exists within the AV Store.
///
/// This is useful in cases like bitfield signing, when existence
/// matters, but we don't want to necessarily pass around large
/// quantities of data to get a single bit of information.
QueryChunkAvailability(Hash, ValidatorIndex, oneshot::Sender<bool>),

/// Store an `ErasureChunk` in the AV store.
///
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
Expand All @@ -267,6 +274,7 @@ impl AvailabilityStoreMessage {
Self::QueryAvailableData(hash, _) => Some(*hash),
Self::QueryDataAvailability(hash, _) => Some(*hash),
Self::QueryChunk(hash, _, _) => Some(*hash),
Self::QueryChunkAvailability(hash, _, _) => Some(*hash),
Self::StoreChunk(hash, _, _, _) => Some(*hash),
Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash),
}
Expand Down

0 comments on commit 474c221

Please sign in to comment.