Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

dyn overseer channel capacity #5454

Merged
merged 8 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ pub struct RunCmd {
/// telemetry, if telemetry is enabled.
#[clap(long)]
pub no_hardware_benchmarks: bool,

/// Overseer message capacity override.
///
/// **Dangerous!** Do not touch unless explicitly adviced to.
#[clap(long)]
pub overseer_channel_capacity_override: Option<usize>,
}

#[allow(missing_docs)]
Expand Down
1 change: 1 addition & 0 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ where
None,
false,
overseer_gen,
cli.run.overseer_channel_capacity_override,
hwbench,
)
.map(|full| full.task_manager)
Expand Down
81 changes: 73 additions & 8 deletions node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
/// Specify the the initialization function for a subsystem
Expand All @@ -171,6 +174,10 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,


channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -207,6 +214,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: self. #baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -254,6 +264,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand All @@ -272,6 +285,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#to_keep_baggage_name: self. #to_keep_baggage_name,
)*
spawner: self.spawner,

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
Expand Down Expand Up @@ -359,20 +375,25 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}

impl ::std::default::Default for #connector {
fn default() -> Self {
/// Create a new connector with non-default event channel capacity.
pub fn with_event_capacity(event_channel_capacity: usize) -> Self {
let (events_tx, events_rx) = #support_crate ::metered::channel::<
#event
>(SIGNAL_CHANNEL_CAPACITY);
>(event_channel_capacity);

Self {
handle: events_tx,
consumer: events_rx,
}
}
}

impl ::std::default::Default for #connector {
fn default() -> Self {
Self::with_event_capacity(SIGNAL_CHANNEL_CAPACITY)
}
}
});

ts.extend(quote!{
Expand All @@ -385,6 +406,11 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#baggage_name: #baggage_passthrough_state_generics,
)*
spawner: InitStateSpawner,
// user provided runtime overrides,
// if `None`, the `overlord(message_capacity=123,..)` is used
// or the default value.
channel_capacity: Option<usize>,
signal_capacity: Option<usize>,
ordian marked this conversation as resolved.
Show resolved Hide resolved
}
});

Expand All @@ -406,6 +432,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#field_name: Missing::<#field_type>::default(),
)*
spawner: Missing::<S>::default(),

channel_capacity: None,
signal_capacity: None,
}
}
}
Expand All @@ -419,18 +448,48 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#spawner_where_clause
{
/// The `spawner` to use for spawning tasks.
pub fn spawner(self, spawner: S) -> #builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
pub fn spawner(self, spawner: S) -> #builder<
Init<S>,
#( #subsystem_passthrough_state_generics, )*
#( #baggage_passthrough_state_generics, )*
>
{
#builder {
#(
#field_name: self. #field_name,
)*
spawner: Init::<S>::Value(spawner),

channel_capacity: self.channel_capacity,
signal_capacity: self.signal_capacity,
}
}
}
});

// message and signal channel capacity
drahnr marked this conversation as resolved.
Show resolved Hide resolved
ts.extend(quote! {
impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
#builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
where
#spawner_where_clause,
{
/// Set the interconnecting signal channel capacity.
pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
{
self.signal_capacity = Some(capacity);
self
}

/// Set the interconnecting message channel capacities.
drahnr marked this conversation as resolved.
Show resolved Hide resolved
pub fn message_channel_capacity(mut self, capacity: usize) -> Self
{
self.channel_capacity = Some(capacity);
self
}
}
});

ts.extend(quote! {
/// Type used to represent a builder where all fields are initialized and the overseer could be constructed.
pub type #initialized_builder<#initialized_builder_generics> = #builder<Init<S>, #( Init<#field_type>, )*>;
Expand All @@ -446,7 +505,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Complete the construction and create the overseer type.
pub fn build(self)
-> ::std::result::Result<(#overseer_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty> {
let connector = #connector ::default();
let connector = #connector ::with_event_capacity(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);
self.build_with_connector(connector)
}

Expand All @@ -470,7 +531,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
=
#support_crate ::metered::channel::<
MessagePacket< #consumes >
>(CHANNEL_CAPACITY);
>(
self.channel_capacity.unwrap_or(CHANNEL_CAPACITY)
);
)*

#(
Expand Down Expand Up @@ -510,7 +573,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select(
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
);

// Generate subsystem name based on overseer field name.
let subsystem_string = String::from(stringify!(#subsystem_name));
Expand Down
10 changes: 10 additions & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ pub fn new_full<RuntimeApi, ExecutorDispatch, OverseerGenerator>(
program_path: Option<std::path::PathBuf>,
overseer_enable_anyways: bool,
overseer_gen: OverseerGenerator,
overseer_message_channel_capacity_override: Option<usize>,
hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<NewFull<Arc<FullClient<RuntimeApi, ExecutorDispatch>>>, Error>
where
Expand Down Expand Up @@ -1038,6 +1039,7 @@ where
chain_selection_config,
dispute_coordinator_config,
pvf_checker_enabled,
overseer_message_channel_capacity_override,
},
)
.map_err(|e| {
Expand Down Expand Up @@ -1326,6 +1328,7 @@ pub fn build_full(
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
overseer_enable_anyways: bool,
overseer_gen: impl OverseerGen,
overseer_message_channel_override: Option<usize>,
hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<NewFull<Client>, Error> {
#[cfg(feature = "rococo-native")]
Expand All @@ -1343,6 +1346,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Rococo))
Expand All @@ -1360,6 +1364,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Kusama))
Expand All @@ -1377,6 +1382,7 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override,
hwbench,
)
.map(|full| full.with_client(Client::Westend))
Expand All @@ -1394,6 +1400,10 @@ pub fn build_full(
None,
overseer_enable_anyways,
overseer_gen,
overseer_message_channel_override.map(|capacity| {
gum::warn!("Channel capacity should _never_ be tampered with on polkadot!");
capacity
}),
hwbench,
)
.map(|full| full.with_client(Client::Polkadot))
Expand Down
10 changes: 9 additions & 1 deletion node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ where
pub dispute_coordinator_config: DisputeCoordinatorConfig,
/// Enable PVF pre-checking
pub pvf_checker_enabled: bool,
/// Overseer channel capacity override.
pub overseer_message_channel_capacity_override: Option<usize>,
}

/// Obtain a prepared `OverseerBuilder`, that is initialized
Expand Down Expand Up @@ -138,6 +140,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
chain_selection_config,
dispute_coordinator_config,
pvf_checker_enabled,
overseer_message_channel_capacity_override,
}: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<
InitializedOverseerBuilder<
Expand Down Expand Up @@ -292,7 +295,12 @@ where
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.metrics(metrics)
.spawner(spawner);
Ok(builder)

if let Some(capacity) = overseer_message_channel_capacity_override {
Ok(builder.message_channel_capacity(capacity))
} else {
Ok(builder)
}
}

/// Trait for the `fn` generating the overseer.
Expand Down
1 change: 1 addition & 0 deletions node/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn new_full(
false,
polkadot_service::RealOverseerGen,
None,
None,
)
}

Expand Down
3 changes: 2 additions & 1 deletion parachain/test-parachains/adder/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ fn main() -> Result<()> {
config,
polkadot_service::IsCollator::Yes(collator.collator_key()),
None,
true,
false,
None,
None,
false,
polkadot_service::RealOverseerGen,
None,
None,
)
.map_err(|e| e.to_string())?;
let mut overseer_handle = full_node
Expand Down
3 changes: 2 additions & 1 deletion parachain/test-parachains/undying/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ fn main() -> Result<()> {
config,
polkadot_service::IsCollator::Yes(collator.collator_key()),
None,
true,
false,
None,
None,
false,
polkadot_service::RealOverseerGen,
None,
None,
)
.map_err(|e| e.to_string())?;
let mut overseer_handle = full_node
Expand Down