Skip to content

Commit

Permalink
Revert "Revert "refactor(stages): input target reached & output done …
Browse files Browse the repository at this point in the history
…checks" (#3114)"

This reverts commit 7ec4b0a.
  • Loading branch information
shekhirin committed Jun 13, 2023
1 parent e43455c commit 17a5499
Show file tree
Hide file tree
Showing 26 changed files with 332 additions and 392 deletions.
34 changes: 13 additions & 21 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,22 @@ impl Command {

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&mut provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
account_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = account_hashing_stage.execute(&mut provider_rw, input).await?;
account_hashing_done = output.is_done(input);
}

let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&mut provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = storage_hashing_stage.execute(&mut provider_rw, input).await?;
storage_hashing_done = output.is_done(input);
}

let incremental_result = merkle_stage
Expand Down Expand Up @@ -173,7 +165,7 @@ impl Command {
loop {
let clean_result = merkle_stage.execute(&mut provider_rw, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
if clean_result.unwrap().is_done(clean_input) {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl NodeState {
pipeline_position,
pipeline_total,
stage_id,
result: ExecOutput { checkpoint, done },
result: ExecOutput { checkpoint },
done,
} => {
self.current_checkpoint = checkpoint;

Expand Down
15 changes: 5 additions & 10 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,11 @@ async fn dry_run<DB: Database>(

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input);
}

info!(target: "reth::cli", "Success.");
Expand Down
15 changes: 5 additions & 10 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,11 @@ async fn dry_run<DB: Database>(

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input);
}

info!(target: "reth::cli", "Success.");
Expand Down
19 changes: 8 additions & 11 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,17 @@ async fn dry_run<DB: Database>(
let mut provider = shareable_db.provider_rw()?;
let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
}
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.execute(&mut provider, exec_input)
.await?
.done;
.is_done(exec_input);
}

info!(target: "reth::cli", "Success.");
Expand Down
13 changes: 8 additions & 5 deletions bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, ExecOutput, PipelineError, Stage, UnwindInput,
ExecInput, PipelineError, Stage, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
Expand Down Expand Up @@ -238,10 +238,13 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};

while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut provider_rw, input).await?
{
input.checkpoint = Some(stage_progress);
loop {
let result = exec_stage.execute(&mut provider_rw, input).await?;
if result.is_done(input) {
break
}

input.checkpoint = Some(result.checkpoint);

if self.commit {
provider_rw.commit()?;
Expand Down
87 changes: 41 additions & 46 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ mod tests {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
max_block: Option<BlockNumber>,
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
Expand All @@ -1380,10 +1381,13 @@ mod tests {

// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(H256::default());
let pipeline = Pipeline::builder()
let mut pipeline_builder = Pipeline::builder()
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx)
.build(db.clone(), chain_spec.clone());
.with_tip_sender(tip_tx);
if let Some(max_block) = max_block {
pipeline_builder = pipeline_builder.with_max_block(max_block);
}
let pipeline = pipeline_builder.build(db.clone(), chain_spec.clone());

// Setup blockchain tree
let externals =
Expand All @@ -1403,7 +1407,7 @@ mod tests {
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
None,
max_block,
false,
payload_builder,
None,
Expand Down Expand Up @@ -1438,6 +1442,7 @@ mod tests {
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let res = spawn_consensus_engine(consensus_engine);

Expand Down Expand Up @@ -1467,6 +1472,7 @@ mod tests {
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let mut rx = spawn_consensus_engine(consensus_engine);

Expand Down Expand Up @@ -1506,10 +1512,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }),
Err(StageError::ChannelClosed),
]),
Vec::default(),
Some(2),
);
let rx = spawn_consensus_engine(consensus_engine);

Expand All @@ -1522,7 +1529,9 @@ mod tests {

assert_matches!(
rx.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(
Err(BeaconConsensusEngineError::Pipeline(n))
) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}

Expand All @@ -1536,15 +1545,12 @@ mod tests {
.paris_activated()
.build(),
);
let (mut consensus_engine, env) = setup_consensus_engine(
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]),
Vec::default(),
Some(max_block),
);
consensus_engine.sync.set_max_block(max_block);
let rx = spawn_consensus_engine(consensus_engine);

let _ = env
Expand Down Expand Up @@ -1584,11 +1590,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let mut engine_rx = spawn_consensus_engine(consensus_engine);
Expand All @@ -1615,11 +1619,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1664,10 +1666,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1712,11 +1715,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1750,10 +1751,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1803,10 +1805,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1849,11 +1852,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let mut engine_rx = spawn_consensus_engine(consensus_engine);
Expand Down Expand Up @@ -1882,11 +1883,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1932,11 +1931,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1989,11 +1986,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::from([exec_result2]),
None,
);

insert_blocks(
Expand Down
Loading

0 comments on commit 17a5499

Please sign in to comment.