From b1cc6fa14330261a305d56be36c04e9c99518993 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Thu, 8 Jun 2023 14:21:41 +0300 Subject: [PATCH] av-store: Move prune on a separate thread (#7263) * av-store: Move prune on a separate thread There are situations where pruning of the data could take more than a few seconds and that might make the whole subsystem unreponsive. To avoid this just move the prune process on a separate thread. See: https://github.com/paritytech/polkadot/issues/7237, for more details. Signed-off-by: Alexandru Gheorghe * av-store: Add log that prunning started Signed-off-by: Alexandru Gheorghe * av-store: modify log severity Signed-off-by: Alexandru Gheorghe --------- Signed-off-by: Alexandru Gheorghe --- node/core/av-store/src/lib.rs | 71 ++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index 1c0c8c5e7feb..17c9f9a19833 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -26,7 +26,13 @@ use std::{ time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use futures::{channel::oneshot, future, select, FutureExt}; +use futures::{ + channel::{ + mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender}, + oneshot, + }, + future, select, FutureExt, SinkExt, StreamExt, +}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode, Error as CodecError, Input}; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; @@ -540,9 +546,17 @@ impl AvailabilityStoreSubsystem { #[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)] async fn run(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) { let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse(); - + // Pruning interval is in the order of minutes so we shouldn't have more than one task running + // at one moment in time, so 10 should be more than enough. + let (mut pruning_result_tx, mut pruning_result_rx) = channel(10); loop { - let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await; + let res = run_iteration( + &mut ctx, + &mut subsystem, + &mut next_pruning, + (&mut pruning_result_tx, &mut pruning_result_rx), + ) + .await; match res { Err(e) => { e.trace(); @@ -564,6 +578,10 @@ async fn run_iteration( ctx: &mut Context, subsystem: &mut AvailabilityStoreSubsystem, mut next_pruning: &mut future::Fuse, + (pruning_result_tx, pruning_result_rx): ( + &mut MpscSender>, + &mut MpscReceiver>, + ), ) -> Result { select! { incoming = ctx.recv().fuse() => { @@ -612,15 +630,51 @@ async fn run_iteration( // It's important to set the delay before calling `prune_all` because an error in `prune_all` // could lead to the delay not being set again. Then we would never prune anything anymore. *next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse(); - - let _timer = subsystem.metrics.time_pruning(); - prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?; - } + start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?; + }, + // Received the prune result and propagate the errors, so that in case of a fatal error + // the main loop of the subsystem can exit graciously. + result = pruning_result_rx.next() => { + if let Some(result) = result { + result?; + } + }, } Ok(false) } +// Start prune-all on a separate thread, so that in the case when the operation takes +// longer than expected we don't keep the whole subsystem blocked. +// See: https://github.com/paritytech/polkadot/issues/7237 for more details. +#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)] +async fn start_prune_all( + ctx: &mut Context, + subsystem: &mut AvailabilityStoreSubsystem, + mut pruning_result_tx: MpscSender>, +) -> Result<(), Error> { + let metrics = subsystem.metrics.clone(); + let db = subsystem.db.clone(); + let config = subsystem.config; + let time_now = subsystem.clock.now()?; + + ctx.spawn_blocking( + "av-store-prunning", + Box::pin(async move { + let _timer = metrics.time_pruning(); + + gum::debug!(target: LOG_TARGET, "Prunning started"); + let result = prune_all(&db, &config, time_now); + + if let Err(err) = pruning_result_tx.send(result).await { + // This usually means that the node is closing down, log it just in case + gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",); + } + }), + )?; + Ok(()) +} + #[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)] async fn process_block_activated( ctx: &mut Context, @@ -1250,8 +1304,7 @@ fn store_available_data( Ok(()) } -fn prune_all(db: &Arc, config: &Config, clock: &dyn Clock) -> Result<(), Error> { - let now = clock.now()?; +fn prune_all(db: &Arc, config: &Config, now: Duration) -> Result<(), Error> { let (range_start, range_end) = pruning_range(now); let mut tx = DBTransaction::new();