Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
av-store: Move prune on a separate thread (#7263)
Browse files Browse the repository at this point in the history
* av-store: Move prune on a separate thread

There are situations where pruning of the data could take more than a few
seconds and that might make the whole subsystem unreponsive. To avoid this just
move the prune process on a separate thread.

See: #7237, for more details.

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: Add log that prunning started

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: modify log severity

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh authored Jun 8, 2023
1 parent 6debcdb commit b1cc6fa
Showing 1 changed file with 62 additions and 9 deletions.
71 changes: 62 additions & 9 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use std::{
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};

use futures::{channel::oneshot, future, select, FutureExt};
use futures::{
channel::{
mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
oneshot,
},
future, select, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
Expand Down Expand Up @@ -540,9 +546,17 @@ impl<Context> AvailabilityStoreSubsystem {
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

// Pruning interval is in the order of minutes so we shouldn't have more than one task running
// at one moment in time, so 10 should be more than enough.
let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
loop {
let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
let res = run_iteration(
&mut ctx,
&mut subsystem,
&mut next_pruning,
(&mut pruning_result_tx, &mut pruning_result_rx),
)
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -564,6 +578,10 @@ async fn run_iteration<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut next_pruning: &mut future::Fuse<Delay>,
(pruning_result_tx, pruning_result_rx): (
&mut MpscSender<Result<(), Error>>,
&mut MpscReceiver<Result<(), Error>>,
),
) -> Result<bool, Error> {
select! {
incoming = ctx.recv().fuse() => {
Expand Down Expand Up @@ -612,15 +630,51 @@ async fn run_iteration<Context>(
// It's important to set the delay before calling `prune_all` because an error in `prune_all`
// could lead to the delay not being set again. Then we would never prune anything anymore.
*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

let _timer = subsystem.metrics.time_pruning();
prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?;
}
start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
},
// Received the prune result and propagate the errors, so that in case of a fatal error
// the main loop of the subsystem can exit graciously.
result = pruning_result_rx.next() => {
if let Some(result) = result {
result?;
}
},
}

Ok(false)
}

// Start prune-all on a separate thread, so that in the case when the operation takes
// longer than expected we don't keep the whole subsystem blocked.
// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn start_prune_all<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut pruning_result_tx: MpscSender<Result<(), Error>>,
) -> Result<(), Error> {
let metrics = subsystem.metrics.clone();
let db = subsystem.db.clone();
let config = subsystem.config;
let time_now = subsystem.clock.now()?;

ctx.spawn_blocking(
"av-store-prunning",
Box::pin(async move {
let _timer = metrics.time_pruning();

gum::debug!(target: LOG_TARGET, "Prunning started");
let result = prune_all(&db, &config, time_now);

if let Err(err) = pruning_result_tx.send(result).await {
// This usually means that the node is closing down, log it just in case
gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
}
}),
)?;
Ok(())
}

#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn process_block_activated<Context>(
ctx: &mut Context,
Expand Down Expand Up @@ -1250,8 +1304,7 @@ fn store_available_data(
Ok(())
}

fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
let now = clock.now()?;
fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
let (range_start, range_end) = pruning_range(now);

let mut tx = DBTransaction::new();
Expand Down

0 comments on commit b1cc6fa

Please sign in to comment.