Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Better progress reporting for stage checkpoints #2982

Merged
merged 13 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(stages): index history stages progress (#2949)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
  • Loading branch information
shekhirin and gakonst committed Jun 3, 2023
commit 088f4d7593f8543eaa7c7bed503cb75ff05cf7ce
49 changes: 48 additions & 1 deletion crates/primitives/src/stage/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ pub struct HeadersCheckpoint {
pub progress: EntitiesCheckpoint,
}

/// Saves the progress of Index History stages.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct IndexHistoryCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in changesets.
pub progress: EntitiesCheckpoint,
}

/// Saves the progress of abstract stage iterating over or downloading entities.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -179,6 +189,12 @@ impl From<RangeInclusive<BlockNumber>> for CheckpointBlockRange {
}
}

impl From<&RangeInclusive<BlockNumber>> for CheckpointBlockRange {
fn from(range: &RangeInclusive<BlockNumber>) -> Self {
Self { from: *range.start(), to: *range.end() }
}
}

/// Saves the progress of a stage.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -235,6 +251,14 @@ impl StageCheckpoint {
}
}

/// Returns the index history stage checkpoint, if any.
pub fn index_history_stage_checkpoint(&self) -> Option<IndexHistoryCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::IndexHistory(checkpoint)) => Some(checkpoint),
_ => None,
}
}

/// Sets the block number.
pub fn with_block_number(mut self, block_number: BlockNumber) -> Self {
self.block_number = block_number;
Expand Down Expand Up @@ -276,6 +300,15 @@ impl StageCheckpoint {
self.stage_checkpoint = Some(StageUnitCheckpoint::Headers(checkpoint));
self
}

/// Sets the stage checkpoint to index history.
pub fn with_index_history_stage_checkpoint(
mut self,
checkpoint: IndexHistoryCheckpoint,
) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::IndexHistory(checkpoint));
self
}
}

impl Display for StageCheckpoint {
Expand All @@ -290,7 +323,11 @@ impl Display for StageCheckpoint {
}) |
StageUnitCheckpoint::Entities(entities) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }),
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint {
progress: entities,
..
}),
) => entities.fmt(f),
None => write!(f, "{}", self.block_number),
}
Expand All @@ -313,6 +350,8 @@ pub enum StageUnitCheckpoint {
Execution(ExecutionCheckpoint),
/// Saves the progress of Headers stage.
Headers(HeadersCheckpoint),
/// Saves the progress of Index History stage.
IndexHistory(IndexHistoryCheckpoint),
}

impl Compact for StageUnitCheckpoint {
Expand Down Expand Up @@ -341,6 +380,10 @@ impl Compact for StageUnitCheckpoint {
buf.put_u8(4);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::IndexHistory(data) => {
buf.put_u8(5);
1 + data.to_compact(buf)
}
}
}

Expand Down Expand Up @@ -369,6 +412,10 @@ impl Compact for StageUnitCheckpoint {
let (data, buf) = HeadersCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Headers(data), buf)
}
5 => {
let (data, buf) = IndexHistoryCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::IndexHistory(data), buf)
}
_ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"),
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/primitives/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub use id::StageId;
mod checkpoints;
pub use checkpoints::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint,
HeadersCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint,
StorageHashingCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
};
11 changes: 7 additions & 4 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where
self.metrics.stage_checkpoint(
stage_id,
get_stage_checkpoint(&tx, stage_id)?.unwrap_or_default(),
None,
0,
);
}
Ok(())
Expand Down Expand Up @@ -270,10 +270,11 @@ where
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
self.metrics.stage_checkpoint(
stage_id, checkpoint,
stage_id,
checkpoint,
// We assume it was set in the previous execute iteration, so it
// doesn't change when we unwind.
None,
checkpoint.block_number,
);
tx.save_stage_checkpoint(stage_id, checkpoint)?;

Expand Down Expand Up @@ -346,7 +347,9 @@ where
self.metrics.stage_checkpoint(
stage_id,
checkpoint,
previous_stage.map(|(_, checkpoint)| checkpoint.block_number),
previous_stage
.map(|(_, checkpoint)| checkpoint.block_number)
.unwrap_or_default(),
);
tx.save_stage_checkpoint(stage_id, checkpoint)?;

Expand Down
14 changes: 7 additions & 7 deletions crates/stages/src/pipeline/sync_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use reth_metrics::{
use reth_primitives::{
stage::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecutionCheckpoint, HeadersCheckpoint,
StageCheckpoint, StageId, StageUnitCheckpoint, StorageHashingCheckpoint,
IndexHistoryCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint,
StorageHashingCheckpoint,
},
BlockNumber,
};
Expand All @@ -32,7 +33,7 @@ impl Metrics {
&mut self,
stage_id: StageId,
checkpoint: StageCheckpoint,
max_block_number: Option<BlockNumber>,
max_block_number: BlockNumber,
) {
let stage_metrics = self
.stages
Expand All @@ -47,14 +48,13 @@ impl Metrics {
StageUnitCheckpoint::Storage(StorageHashingCheckpoint { progress, .. }) |
StageUnitCheckpoint::Entities(progress @ EntitiesCheckpoint { .. }) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }),
) => (progress.processed, Some(progress.total)),
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { progress, .. }),
) => (progress.processed, progress.total),
None => (checkpoint.block_number, max_block_number),
};

stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
stage_metrics.entities_total.set(total as f64);
}
}
150 changes: 143 additions & 7 deletions crates/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId,
},
BlockNumber,
};
use reth_provider::Transaction;
use std::fmt::Debug;
use std::{
fmt::Debug,
ops::{Deref, RangeInclusive},
};
use tracing::*;

/// Stage is indexing history the account changesets generated in
Expand Down Expand Up @@ -40,12 +48,22 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}

let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;

let indices = tx.get_account_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();

// Insert changeset to history index
tx.insert_account_history_index(indices)?;

stage_checkpoint.progress.processed += changesets;

info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}

/// Unwind the stage.
Expand All @@ -57,16 +75,70 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);

tx.unwind_account_history_indices(range)?;
let changesets = tx.unwind_account_history_indices(range)?;

let checkpoint =
if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() {
stage_checkpoint.progress.processed -= changesets as u64;
StageCheckpoint::new(unwind_progress)
.with_index_history_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_progress)
};

info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
Ok(UnwindOutput { checkpoint })
}
}

// The function proceeds as follows:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rewrite as doc comment so it shows up in tooltips etc

// 1. It first checks if the checkpoint has an `IndexHistoryCheckpoint` that matches the given
// block range. If it does, the function returns that checkpoint.
// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates
// a new `IndexHistoryCheckpoint` with the given block range and updates the progress with the
// current progress.
// 3. If none of the above conditions are met, it creates a new `IndexHistoryCheckpoint` with the
// given block range and calculates the progress by counting the number of processed entries in the
// `AccountChangeSet` table within the given block range.
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
checkpoint: StageCheckpoint,
range: &RangeInclusive<BlockNumber>,
) -> Result<IndexHistoryCheckpoint, DatabaseError> {
Ok(match checkpoint.index_history_stage_checkpoint() {
Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(range) =>
{
stage_checkpoint
}
Some(IndexHistoryCheckpoint { block_range, progress })
if block_range.to == checkpoint.block_number =>
{
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: progress.processed,
total: tx.deref().entries::<tables::AccountChangeSet>()? as u64,
},
}
}
_ => IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(0..=checkpoint.block_number)?
.count() as u64,
total: tx.deref().entries::<tables::AccountChangeSet>()? as u64,
},
},
})
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use std::collections::BTreeMap;

use super::*;
Expand Down Expand Up @@ -141,7 +213,21 @@ mod tests {
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number + 1,
to: run_to
},
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
tx.commit().unwrap();
}

Expand Down Expand Up @@ -353,4 +439,54 @@ mod tests {
])
);
}

#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();

tx.commit(|tx| {
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
Ok(())
})
.unwrap();

assert_matches!(
stage_checkpoint(&tx.inner(), StageCheckpoint::new(1), &(1..=2)).unwrap(),
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 2 },
progress: EntitiesCheckpoint { processed: 2, total: 4 }
}
);
}
}
Loading