Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim committed Oct 19, 2023
1 parent 1f524da commit 87dff01
Showing 1 changed file with 25 additions and 5 deletions.
30 changes: 25 additions & 5 deletions polkadot/node/subsystem-util/src/worker_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ pub enum WorkerPoolMessage<Config: WorkerConfig> {
DeleteJobs(Vec<JobId>),
}

/// A specialized worker pool implementation that has the following characteristics:
/// - the pool accepts work items, which is the building block for jobs.
/// - jobs can be `long lived` meaning they span multiple work items or `short lived`` meaning
/// just a single work item.
/// - `Job` trait is used to classify work items and assign them a `JobId`
/// - short lived jobs are broadcasted to all workers in the pool
/// - all work items of a job are forwarded to only one worker
/// - jobs are distributed in a round robin fashion, to ensure fair distribution of
/// load
///
/// This design is suitable for usage in subsystems when dealing with many network messages
/// for example. Each worker has it's own local state, but all need to know about newly imported
/// or finalized blocks. This can easily be implemented as `short lived` jobs broadcasted to
/// all workers. The `long lived` jobs should tipically make up 99% of the worker load, while 1%
/// should be `short lived` similar to control/data interfaces.
pub struct WorkerPool<Config: WorkerConfig> {
// Per worker context mapping. Values are indices in `worker_handles`.
job_per_worker: HashMap<JobId, usize>,
Expand Down Expand Up @@ -262,7 +277,7 @@ impl<Config: WorkerConfig + Sized> WorkerPool<Config> {

/// Prune specified jobs and notify workers.
pub async fn delete_jobs(&mut self, jobs: Vec<JobId>) {
// We need to split the contexts per worker.
// We need to split the jobs per worker.
let mut prunable_per_worker_jobs = vec![Vec::new(); self.worker_handles.len()];
let num_deleted = jobs.len();
for job in jobs {
Expand Down Expand Up @@ -298,7 +313,7 @@ impl<Config: WorkerConfig + Sized> WorkerPool<Config> {
if let Some(worker_handle) = self.find_worker_for_job(&job_id) {
worker_handle.new_job(job_id, state).await;
} else {
// The work requires a new `Job`` and `self.next_worker` should be suitable.
// The work requires a new `Job` and `self.next_worker` should be suitable.
//
// TODO: If needed we might want to define more methods to choose a worker if
// `Job` can provide additional information. TODO: Handle blocking due to queue
Expand Down Expand Up @@ -326,7 +341,7 @@ impl<Config: WorkerConfig + Sized> WorkerPool<Config> {
let job_id = if let Some(job_id) = work_item.id() {
job_id
} else {
// Work items not associated top a specific `Job`` are broadcasted to all workers.
// Work items not associated top a specific `Job` are broadcasted to all workers.
let broadcast_futures = self
.worker_handles
.iter()
Expand Down Expand Up @@ -356,8 +371,13 @@ impl<Config: WorkerConfig + Sized> WorkerPool<Config> {
&self.worker_handles[self.next_worker]
}

// Default main loop implementation
fn run_main_loop(mut self) {
/// Starts the main worker pool loop task. Calling this function is requirement for using
/// `WorkerPoolHandler` API.
///
/// In principle it is better to use `WorkerPool` directly, but that is restricted to a single
/// thread owning the `WorkerPool` instance. If multiple work producers are needed, then
/// using `WorkerPoolHandler` is more appropriate.
pub fn run_main_loop(mut self) {
let worker_loop = async move {
loop {
if let Some(worker_message) = self.next().await {
Expand Down

0 comments on commit 87dff01

Please sign in to comment.