Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

mod subsystem-util #1376

Merged
merged 32 commits into from
Jul 14, 2020
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aee251a
Add subsystem-util crate.
coriolinus Jul 8, 2020
1719606
copy utility functions for requesting runtime data; generalize
coriolinus Jul 8, 2020
bc5efd0
convert subsystem-util from crate to module in subsystem
coriolinus Jul 8, 2020
145dd58
make runtime request sender type generic
coriolinus Jul 8, 2020
e76239d
candidate backing subsystem uses util for api requests
coriolinus Jul 8, 2020
be2be35
add struct Validator representing the local validator
coriolinus Jul 8, 2020
32e5c18
add alternate constructor for better efficiency
coriolinus Jul 9, 2020
5d8a8af
refactor candidate backing to use utility methods
coriolinus Jul 9, 2020
447ab4a
fix test breakage caused by reordering tests
coriolinus Jul 9, 2020
1d59e66
restore test which accidentally got deleted during merge
coriolinus Jul 9, 2020
fa46a8a
start extracting jobs management into helper traits + structs
coriolinus Jul 9, 2020
36b3367
use util::{JobHandle, Jobs} in CandidateBackingSubsystem
coriolinus Jul 10, 2020
91e301b
implement generic job-manager subsystem impl
coriolinus Jul 10, 2020
3a47236
add hash-extraction helper to messages
coriolinus Jul 10, 2020
38110d5
fix errors caused by improper rebase
coriolinus Jul 10, 2020
b5d28ce
doc improvement
coriolinus Jul 10, 2020
93fe700
simplify conversion from overseer communication to job message
coriolinus Jul 10, 2020
cca7d4b
document fn hash for all messages
coriolinus Jul 10, 2020
841579d
rename fn hash() -> fn relay_parent
coriolinus Jul 10, 2020
38ee247
gracefully shut down running futures on Conclude
coriolinus Jul 10, 2020
8bb08b8
ensure we're validating with the proper validator index
coriolinus Jul 10, 2020
fa595c2
rename: handle_unhashed_msg -> handle_orphan_msg
coriolinus Jul 13, 2020
940c111
impl Stream for Jobs<Spawner, Job>
coriolinus Jul 13, 2020
11772f5
add missing documentation for public items
coriolinus Jul 13, 2020
dc9e12c
use pin-project to eliminate unsafe code from this codebase
coriolinus Jul 13, 2020
802981a
rename SenderMessage -> FromJob
coriolinus Jul 13, 2020
a5639e3
reenvision the subsystem requests as an extension trait
coriolinus Jul 13, 2020
e7f5276
Revert "reenvision the subsystem requests as an extension trait"
coriolinus Jul 13, 2020
0eaca1b
apply suggested futuresunordered join_all impl
coriolinus Jul 14, 2020
863d733
CandidateValidationMessage variants have no top-level relay parents
coriolinus Jul 14, 2020
1e14746
rename handle_orphan_msg -> handle_unanchored_msg
coriolinus Jul 14, 2020
e3f76a4
make most node-core-backing types private
coriolinus Jul 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions node/subsystem/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use futures::{
future::Either,
prelude::*,
select,
task::{Spawn, SpawnError, SpawnExt},
stream::Stream,
task::{self, Spawn, SpawnError, SpawnExt},
};
use futures_timer::Delay;
use keystore::KeyStorePtr;
Expand All @@ -42,6 +43,7 @@ use sp_core::Pair;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
time::Duration,
};
Expand Down Expand Up @@ -296,7 +298,7 @@ impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other
/// subsystems.
pub trait JobTrait {
pub trait JobTrait: Unpin {
/// Message type to the job. Typically a subset of AllMessages.
type ToJob: 'static + ToJobTrait + Send;
/// Message type from the job. Typically a subset of AllMessages.
Expand Down Expand Up @@ -413,25 +415,67 @@ impl<Spawner: Spawn, Job: JobTrait> Jobs<Spawner, Job> {
Ok(())
}

/// Get the next message from any of the underlying jobs.
async fn next(&mut self) -> Option<Job::FromJob> {
self.outgoing_msgs.next().await.and_then(|(e, _)| match e {
StreamYield::Item(e) => Some(e),
_ => None,
})
/// Get the pin projection for the `outgoing_msgs` field
///
/// From the [pin docs]:
///
/// > It is actually up to the author of the data structure to decide whether the pinned
/// > projection for a particular field turns `Pin<&mut Struct>` into `Pin<&mut Field>`
/// > or `&mut Field`. There are some constraints, though, and the most important constraint
/// > is _consistency_: every field can be _either_ projected to a pinned reference, _or_
/// > have pinning removed as part of the projection. If both are done for the same field,
/// > that will likely be unsound!
///
/// In this case, pinning is structural.
///
/// ## Considerations
///
/// 1. The struct must only be `Unpin` if all the structural fields are `Unpin`: ✔
/// 2. The destructor of the struct must not move structural fields out of its argument: ✔
/// 3. Uphold the `Drop` guarantee: once the struct is pinned, the memory which contains
/// the content is not overwritten or deallocated without calling the content's destructors.
/// I.e. you may not free or reuse the storage without calling `drop`. ✔
/// 4. You must not offer any other operations (i.e. `take`) which could lead to data being
/// moved out of the structural fields when your type is pinned: ✔
///
/// [pin docs]: https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning
fn pin_get_outgoing_msgs(self: Pin<&mut Self>) -> Pin<&mut StreamUnordered<mpsc::Receiver<Job::FromJob>>> {
coriolinus marked this conversation as resolved.
Show resolved Hide resolved
// This is ok because `self.outgoing_msgs` is pinned when `self` is.
unsafe { self.map_unchecked_mut(|s| &mut s.outgoing_msgs) }
}
}

// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
// we just abort them all. Still better than letting them dangle.
impl<Spawner, Job: JobTrait> Drop for Jobs<Spawner, Job> {
fn drop(&mut self) {
for job_handle in self.running.values() {
job_handle.abort_handle.abort();
// `new_unchecked` is ok because we know this value is never used again after being dropped
inner_drop(unsafe { Pin::new_unchecked(self) });
fn inner_drop<Spawner, Job: JobTrait>(slf: Pin<&mut Jobs<Spawner, Job>>) {
for job_handle in slf.running.values() {
job_handle.abort_handle.abort();
}
}
}
}

impl<Spawner, Job> Stream for Jobs<Spawner, Job>
where
Spawner: Spawn,
Job: JobTrait,
{
type Item = Job::FromJob;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
self.pin_get_outgoing_msgs()
.poll_next(cx)
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None,
}))
}
}

/// A basic implementation of a subsystem.
///
/// This struct is responsible for handling message traffic between
Expand All @@ -447,7 +491,7 @@ pub struct JobManager<Spawner, Context, Job: JobTrait> {

impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
Spawner: Spawn + Clone + Send,
Spawner: Spawn + Clone + Send + Unpin,
Context: SubsystemContext,
Job: JobTrait,
Job::RunArgs: Clone,
Expand Down Expand Up @@ -552,7 +596,7 @@ where

impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context, Job>
where
Spawner: Spawn + Send + Clone + 'static,
Spawner: Spawn + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<Job::ToJob>,
Job: JobTrait + Send,
Expand Down