Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --X-trie-log subcommand #6303

Merged
merged 27 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
16c0a49
Add x-trie-log subcommand for one-off backlog prune
siladu Nov 20, 2023
7dd4928
long -> int
siladu Nov 20, 2023
bf2b098
Removed banned method
gfukushima Dec 12, 2023
e67ae51
Preload process stream in parallel
gfukushima Dec 12, 2023
9b4e0c9
Drop unwanted trielogs and keep reatain layers only
gfukushima Dec 14, 2023
0b9fe83
Add output to user and cleanup refactor
gfukushima Dec 15, 2023
426848e
small tweak to display cf that had reference dropped by RocksDbSegmen…
gfukushima Dec 15, 2023
7401b59
spotless
gfukushima Dec 15, 2023
1b7fb72
Fix classes that changed package
gfukushima Dec 15, 2023
11e6b05
spotless
gfukushima Dec 15, 2023
f2d01e2
Code review
gfukushima Dec 15, 2023
04f1aaa
Only clear DB when we have the exact amount of trie logs we want in m…
gfukushima Dec 15, 2023
2f01c5a
Trielogs stream to and from file to avoid possibly OOM
gfukushima Dec 18, 2023
56e4c8e
Process trie logs in chunks to avoid OOM
gfukushima Dec 18, 2023
78561b0
save and read in batches to handle edge cases
gfukushima Dec 19, 2023
42c72cf
save and read files to/from database dir
gfukushima Dec 20, 2023
9961fc2
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 20, 2023
9389540
add unit tests and PR review fixes
gfukushima Dec 21, 2023
e3d4fbc
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 21, 2023
c7144fe
spdx
gfukushima Dec 21, 2023
20b0ba5
Fix unit tests directory creation and deletion
gfukushima Dec 21, 2023
e7d175c
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 3, 2024
b214bf2
PR review
gfukushima Jan 4, 2024
aa75348
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 4, 2024
2bc0732
Merge remote-tracking branch 'origin/x-trie-log-subcommand-2' into x-…
gfukushima Jan 4, 2024
deec021
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
5d7b68e
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Drop unwanted trielogs and keep reatain layers only
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
  • Loading branch information
gfukushima committed Dec 15, 2023
commit 9b4e0c9f3aaf69e2f9f08fec94904bb9428c809f
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
description = "This command provides storage related actions.",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class,
subcommands = {StorageSubCommand.RevertVariablesStorage.class, RocksDbSubCommand.class, TrieLogSubCommand.class})
subcommands = {
StorageSubCommand.RevertVariablesStorage.class,
RocksDbSubCommand.class,
TrieLogSubCommand.class
})
public class StorageSubCommand implements Runnable {

/** The constant COMMAND_NAME. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,99 +18,115 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration.Unstable.MINIMUM_BONSAI_TRIE_LOG_RETENTION_THRESHOLD;

import com.google.common.base.Splitter;
import org.apache.tuweni.bytes.Bytes;
import org.bouncycastle.util.encoders.Base64;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Helper class for counting and pruning trie logs */
public class TrieLogHelper {
private static final String trieLogFile
= "trieLogsToRetain.txt";
private static final Logger LOG = LoggerFactory.getLogger(TrieLogHelper.class);

static void countAndPrune(
final PrintWriter out,
final DataStorageConfiguration config,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain,
final BesuController besuController) {

TrieLogHelper.validatePruneConfiguration(config);
final long layersToRetain = config.getUnstable().getBonsaiTrieLogRetentionThreshold();
final long chainHeight = blockchain.getChainHeadBlockNumber();
final Optional<Hash> finalizedBlockHash = blockchain.getFinalized();
if (finalizedBlockHash.isEmpty()) {
LOG.debug("No finalized block present, skipping pruning");
return ;
}

final TrieLogCount count = getCount(rootWorldStateStorage, Integer.MAX_VALUE, blockchain);
final long numberOfTrieLogsToRetain =
finalizedBlockHash
.flatMap(blockchain::getBlockHeader)
.map(ProcessableBlockHeader::getNumber)
.map(finalizedBlock -> Math.min(chainHeight-finalizedBlock, layersToRetain))
.orElse(layersToRetain);

out.println("Counting trie logs before prune...");
printCount(out, count);
out.println();
// retrieve the layersToRetains hashes from blockchain
final List<Hash> hashesToRetain = new ArrayList<>();

final int layersToRetain = (int) config.getUnstable().getBonsaiTrieLogRetentionThreshold();
final int batchSize = config.getUnstable().getBonsaiTrieLogPruningLimit();
final boolean isProofOfStake =
besuController.getGenesisConfigOptions().getTerminalTotalDifficulty().isPresent();
TrieLogPruner pruner =
new TrieLogPruner(
rootWorldStateStorage, blockchain, layersToRetain, batchSize, isProofOfStake);
final long lastHashToRetain = chainHeight - numberOfTrieLogsToRetain;
for (long i = chainHeight; i >= lastHashToRetain; i--) {
final Optional<BlockHeader> header = blockchain.getBlockHeader(i);
header.ifPresent(blockHeader -> hashesToRetain.add(blockHeader.getHash()));
}

IdentityHashMap<byte[], byte[]> trieLogsToRetain = new IdentityHashMap<>();

if(hashesToRetain.stream().count() == numberOfTrieLogsToRetain){
//save trielogs in a flatfile as a fail-safe
saveTrieLogsInFile(trieLogsToRetain);

final int totalToPrune = count.total() - layersToRetain;
out.printf(
"""
Total to prune = %d (total) - %d (retention threshold) =
=> %d
""",
count.total(), layersToRetain, totalToPrune);
final long numBatches = Math.max(totalToPrune / batchSize, 1);
out.println();
out.printf(
"Estimated number of batches = max(%d (total to prune) / %d (batch size), 1) = %d\n",
totalToPrune, batchSize, numBatches);
out.println();

int noProgressCounter = 0;
int prevTotalNumberPruned = 0;
int totalNumberPruned = 0;
int numberPrunedInBatch;
int batchNumber = 1;
while (totalNumberPruned < totalToPrune) {
out.printf(
"""
Pruning batch %d
-----------------
""", batchNumber++);
// do prune
numberPrunedInBatch = pruner.initialize();

out.printf("Number pruned in batch = %d \n", numberPrunedInBatch);
totalNumberPruned += numberPrunedInBatch;
out.printf(
"""
Running total number pruned =
=> %d of %d
""",
totalNumberPruned, totalToPrune);

if (totalNumberPruned == prevTotalNumberPruned) {
if (noProgressCounter++ == 5) {
out.println("No progress in 5 batches, exiting");
return;
}
}

prevTotalNumberPruned = totalNumberPruned;
out.println();
hashesToRetain.forEach(
hash -> {
rootWorldStateStorage
.getTrieLog(hash)
.ifPresent(trieLog -> trieLogsToRetain.put(hash.toArrayUnsafe(), trieLog));
});
}
else{
//try to read the triLogs from the flatfile
trieLogsToRetain = readTrieLogsFromFile();
}

//clear trielogs storage
rootWorldStateStorage.clearTrieLog();

//get an update and insert the trielogs we retained
var updater = rootWorldStateStorage.updater();
trieLogsToRetain.forEach(
(key, value) -> {
System.out.println("key: " + Bytes32.wrap(key).toHexString() + ", value: " + Base64.toBase64String(value));
updater.getTrieLogStorageTransaction().put(key, value);
});
updater.getTrieLogStorageTransaction().commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails then do have a way of restoring the trielog from the backup?


if(rootWorldStateStorage.streamTrieLogKeys(numberOfTrieLogsToRetain).count() == numberOfTrieLogsToRetain){
deleteTrieLogFile();
}
out.println("Trie log prune complete!");
out.println();

out.println("Counting trie logs after prune...");
TrieLogHelper.printCount(
out, TrieLogHelper.getCount(rootWorldStateStorage, Integer.MAX_VALUE, blockchain));
}

private static void validatePruneConfiguration(final DataStorageConfiguration config) {
Expand Down Expand Up @@ -169,11 +185,114 @@ static TrieLogCount getCount(
return new TrieLogCount(total.get(), canonicalCount.get(), forkCount.get(), orphanCount.get());
}

private static boolean saveTrieLogsInFile(final Map<byte[],byte[]> trieLogs){

File file = new File(trieLogFile);

BufferedWriter bf = null;

try {

bf = new BufferedWriter(new FileWriter(file, StandardCharsets.UTF_8));
for (Map.Entry<byte[], byte[]> entry :
trieLogs.entrySet()) {

bf.write(Bytes.of(entry.getKey()) + ":"
+ Base64.toBase64String(entry.getValue()));

bf.newLine();
}

bf.flush();
}
catch (IOException e) {
LOG.error(e.getMessage());
return false;
}
finally {

try {

bf.close();
}
catch (Exception e) {
LOG.error(e.getMessage());
}
}
return true;
}

private static IdentityHashMap<byte[], byte[]> readTrieLogsFromFile() {


File file = new File(trieLogFile);

IdentityHashMap<byte[], byte[]> trieLogs = new IdentityHashMap<>();


BufferedReader br = null;


try {


br = new BufferedReader(new FileReader(file, StandardCharsets.UTF_8));


String line;

while ((line = br.readLine()) != null) {


List<String> parts = Splitter.on(':').splitToList(line);

byte[] key = Bytes.fromHexString(parts.get(0)).toArrayUnsafe();

byte[] value = Base64.decode(parts.get(1));


trieLogs.put(key, value);

}

} catch (IOException e) {

LOG.error(e.getMessage());

} finally {

try {

if (br != null) {

br.close();

}

} catch (Exception e) {

LOG.error(e.getMessage());

}

}

return trieLogs;

}

private static void deleteTrieLogFile(){
File file = new File(trieLogFile);
file.delete();
}

static void printCount(final PrintWriter out, final TrieLogCount count) {
out.printf(
"trieLog count: %s\n - canonical count: %s\n - fork count: %s\n - orphaned count: %s\n",
count.total, count.canonicalCount, count.forkCount, count.orphanCount);
}

record TrieLogCount(int total, int canonicalCount, int forkCount, int orphanCount) {}


}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ static class PruneTrieLog implements Runnable {
public void run() {
TrieLogContext context = getTrieLogContext();

TrieLogHelper.countAndPrune(
spec.commandLine().getOut(),
context.config(),
context.rootWorldStateStorage(),
context.blockchain(),
context.besuController());
try {
TrieLogHelper.countAndPrune(
spec.commandLine().getOut(),
context.config(),
context.rootWorldStateStorage(),
context.blockchain(),
context.besuController());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Expand All @@ -126,7 +130,6 @@ record TrieLogContext(
BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
MutableBlockchain blockchain) {}


private static TrieLogContext getTrieLogContext() {
Configurator.setLevel(LoggerFactory.getLogger(TrieLogPruner.class).getName(), Level.DEBUG);
checkNotNull(parentCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public Optional<byte[]> getTrieLog(final Hash blockHash) {
return trieLogStorage.get(blockHash.toArrayUnsafe());
}

public Stream<byte[]> streamTrieLogKeys(final int limit) {
public Stream<byte[]> streamTrieLogKeys(final long limit) {
return trieLogStorage.streamKeys().limit(limit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private int preloadQueue() {
try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {
final AtomicLong count = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.parallel().forEach(
trieLogKeys.forEach(
blockHashAsBytes -> {
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
Expand Down