Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stages): sender recovery stage progress #2910

Merged
merged 6 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(stages): sender recovery stage progress
  • Loading branch information
shekhirin committed May 31, 2023
commit b5609a2457dff6d34b786f67ff62a1e4f6d5b8d8
70 changes: 59 additions & 11 deletions crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable, RawValue,
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{
keccak256,
stage::{StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H160,
};
use reth_provider::Transaction;
use std::fmt::Debug;
use std::{fmt::Debug, ops::Deref};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::*;
Expand Down Expand Up @@ -74,7 +75,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
if first_tx_num > last_tx_num {
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached");
return Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block),
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_progress(tx)?),
done: is_final_range,
})
}
Expand Down Expand Up @@ -139,6 +141,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
});
}

// Iterate over channels and append the sender in the order that they are received.
for mut channel in channels {
while let Some(recovered) = channel.recv().await {
Expand All @@ -147,8 +150,18 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}

// Drop cursors, so we can get mutable tx borrow for stage progress calculation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it need to be mut?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops there's actually no need, fixed in other stages too

drop(tx_cursor);
drop(senders_cursor);

let stage_checkpoint = stage_progress(tx)?;

info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}

/// Unwind the stage.
Expand All @@ -164,11 +177,25 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;

let stage_checkpoint = stage_progress(tx)?;

info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint),
})
}
}

fn stage_progress<DB: Database>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we define this on the trait level once we implement it for all of the stages?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure because only 3 stages use this approach with calculating total entries in different tables. For others, we increment/decrement processed according to changes in the stage execution/unwind.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what definition on trait level do you have in mind? maybe I misunderstood

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just in general, every stage must implement stage_progress/stage_checkpoint method. the implementation details are up to the respective stage

tx: &mut Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::TxSenders>()? as u64,
total: Some(tx.deref().entries::<tables::Transactions>()? as u64),
})
}

// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SenderRecoveryStageError {
Expand Down Expand Up @@ -225,8 +252,13 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: 1,
total: Some(1)
}))
}, done: true }) if block_number == previous_stage
);

// Validate the stage execution
Expand All @@ -248,13 +280,22 @@ mod tests {
// Seed only once with full input range
runner.seed_execution(first_input).expect("failed to seed execution");

let total_transactions = runner.tx.table::<tables::Transactions>().unwrap().len() as u64;

// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false })
if block_number == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total)
}))
}, done: false }) if block_number == expected_progress &&
processed == runner.tx.inner().block_body_indices(expected_progress).unwrap().last_tx_num() &&
total == total_transactions
);

// Execute second time
Expand All @@ -265,8 +306,15 @@ mod tests {
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total)
}))
}, done: true }) if block_number == previous_stage &&
processed == runner.tx.inner().block_body_indices(previous_stage).unwrap().last_tx_num() &&
total == total_transactions
);

assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/db/src/tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ table!(

table!(
/// (Canonical only) Stores the transaction body for canonical transactions.
( Transactions ) TxNumber | TransactionSignedNoHash
( Transactions ) TxNumber | TransactionSignedNoHash
);

table!(
Expand Down Expand Up @@ -293,7 +293,7 @@ dupsort!(
);

table!(
/// Stores the transaction sender for each transaction.
/// Stores the transaction sender for each canonical transaction.
/// It is needed to speed up execution stage and allows fetching signer without doing
/// transaction signed recovery
( TxSenders ) TxNumber | Address
Expand Down