Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --X-trie-log subcommand #6303

Merged
merged 27 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
16c0a49
Add x-trie-log subcommand for one-off backlog prune
siladu Nov 20, 2023
7dd4928
long -> int
siladu Nov 20, 2023
bf2b098
Removed banned method
gfukushima Dec 12, 2023
e67ae51
Preload process stream in parallel
gfukushima Dec 12, 2023
9b4e0c9
Drop unwanted trielogs and keep reatain layers only
gfukushima Dec 14, 2023
0b9fe83
Add output to user and cleanup refactor
gfukushima Dec 15, 2023
426848e
small tweak to display cf that had reference dropped by RocksDbSegmen…
gfukushima Dec 15, 2023
7401b59
spotless
gfukushima Dec 15, 2023
1b7fb72
Fix classes that changed package
gfukushima Dec 15, 2023
11e6b05
spotless
gfukushima Dec 15, 2023
f2d01e2
Code review
gfukushima Dec 15, 2023
04f1aaa
Only clear DB when we have the exact amount of trie logs we want in m…
gfukushima Dec 15, 2023
2f01c5a
Trielogs stream to and from file to avoid possibly OOM
gfukushima Dec 18, 2023
56e4c8e
Process trie logs in chunks to avoid OOM
gfukushima Dec 18, 2023
78561b0
save and read in batches to handle edge cases
gfukushima Dec 19, 2023
42c72cf
save and read files to/from database dir
gfukushima Dec 20, 2023
9961fc2
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 20, 2023
9389540
add unit tests and PR review fixes
gfukushima Dec 21, 2023
e3d4fbc
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 21, 2023
c7144fe
spdx
gfukushima Dec 21, 2023
20b0ba5
Fix unit tests directory creation and deletion
gfukushima Dec 21, 2023
e7d175c
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 3, 2024
b214bf2
PR review
gfukushima Jan 4, 2024
aa75348
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 4, 2024
2bc0732
Merge remote-tracking branch 'origin/x-trie-log-subcommand-2' into x-…
gfukushima Jan 4, 2024
deec021
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
5d7b68e
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ static void printUsageForColumnFamily(
final RocksDB rocksdb, final ColumnFamilyHandle cfHandle, final PrintWriter out)
throws RocksDBException, NumberFormatException {
final String size = rocksdb.getProperty(cfHandle, "rocksdb.estimate-live-data-size");
final String numberOfKeys = rocksdb.getProperty(cfHandle, "rocksdb.estimate-num-keys");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes here seem fine but unrelated. Every time I look at this class name though I think it is a CLI Usage helper, like the output that gets printed when one asks for help or gets command usage wrong. completely unrelated, but if we are going to make changes in this pr ... :)

boolean emptyColumnFamily = false;
if (!size.isEmpty() && !size.isBlank()) {
if (!size.isBlank() && !numberOfKeys.isBlank()) {
try {
final long sizeLong = Long.parseLong(size);
final long numberOfKeysLong = Long.parseLong(numberOfKeys);
final String totalSstFilesSize =
rocksdb.getProperty(cfHandle, "rocksdb.total-sst-files-size");
final long totalSstFilesSizeLong =
!totalSstFilesSize.isEmpty() && !totalSstFilesSize.isBlank()
? Long.parseLong(totalSstFilesSize)
: 0;
if (sizeLong == 0) {
!totalSstFilesSize.isBlank() ? Long.parseLong(totalSstFilesSize) : 0;
if (sizeLong == 0 && numberOfKeysLong == 0) {
emptyColumnFamily = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
description = "This command provides storage related actions.",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class,
subcommands = {StorageSubCommand.RevertVariablesStorage.class, RocksDbSubCommand.class})
subcommands = {
StorageSubCommand.RevertVariablesStorage.class,
RocksDbSubCommand.class,
TrieLogSubCommand.class
})
public class StorageSubCommand implements Runnable {

/** The constant COMMAND_NAME. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,359 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.cli.subcommands.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Helper class for counting and pruning trie logs */
public class TrieLogHelper {
private static final String TRIE_LOG_FILE = "trieLogsToRetain";
private static final long BATCH_SIZE = 20_000;
private static final int ROCKSDB_MAX_INSERTS_PER_TRANSACTION = 1000;
private static final Logger LOG = LoggerFactory.getLogger(TrieLogHelper.class);

static void prune(
final DataStorageConfiguration config,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain,
final Path dataDirectoryPath) {
final String batchFileNameBase = dataDirectoryPath.resolve(DATABASE_PATH) + "/" + TRIE_LOG_FILE;
gfukushima marked this conversation as resolved.
Show resolved Hide resolved
TrieLogHelper.validatePruneConfiguration(config);
gfukushima marked this conversation as resolved.
Show resolved Hide resolved

final long layersToRetain = config.getUnstable().getBonsaiTrieLogRetentionThreshold();

final long chainHeight = blockchain.getChainHeadBlockNumber();

final long lastBlockNumberToRetainTrieLogsFor = chainHeight - layersToRetain + 1;

if (!validPruneRequirements(blockchain, chainHeight, lastBlockNumberToRetainTrieLogsFor)) {
return;
}

final long numberOfBatches = calculateNumberofBatches(layersToRetain);
jframe marked this conversation as resolved.
Show resolved Hide resolved

processTrieLogBatches(
rootWorldStateStorage,
blockchain,
chainHeight,
lastBlockNumberToRetainTrieLogsFor,
numberOfBatches,
batchFileNameBase);

if (rootWorldStateStorage.streamTrieLogKeys(layersToRetain).count() == layersToRetain) {
deleteFiles(batchFileNameBase, numberOfBatches);
LOG.info("Prune ran successfully. Enjoy some disk space back! \uD83D\uDE80");
} else {
LOG.error("Prune failed. Re-run the subcommand to load the trie logs from file.");
}
}

private static void processTrieLogBatches(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain,
final long chainHeight,
final long lastBlockNumberToRetainTrieLogsFor,
final long numberOfBatches,
final String batchFileNameBase) {

for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
gfukushima marked this conversation as resolved.
Show resolved Hide resolved

final long firstBlockOfBatch = chainHeight - ((batchNumber - 1) * BATCH_SIZE);

final long lastBlockOfBatch =
Math.max(chainHeight - (batchNumber * BATCH_SIZE), lastBlockNumberToRetainTrieLogsFor);

final List<Hash> trieLogKeys =
getTrieLogKeysForBlocks(blockchain, firstBlockOfBatch, lastBlockOfBatch);

saveTrieLogBatches(batchFileNameBase, rootWorldStateStorage, batchNumber, trieLogKeys);
}

LOG.info("Clear trie logs...");
rootWorldStateStorage.clearTrieLog();

for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
restoreTrieLogBatches(rootWorldStateStorage, batchNumber, batchFileNameBase);
}
}

private static void saveTrieLogBatches(
final String batchFileNameBase,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final List<Hash> trieLogKeys) {

LOG.info("Saving trie logs to retain in file (batch {})...", batchNumber);

try {
saveTrieLogsInFile(trieLogKeys, rootWorldStateStorage, batchNumber, batchFileNameBase);
jframe marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
LOG.error("Error saving trie logs to file: {}", e.getMessage());
gfukushima marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException(e);
}
}

private static void restoreTrieLogBatches(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase) {

try {
LOG.info("Restoring trie logs retained from batch {}...", batchNumber);
recreateTrieLogs(rootWorldStateStorage, batchNumber, batchFileNameBase);
} catch (IOException e) {
LOG.error("Error recreating trie logs from batch {}: {}", batchNumber, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do at this point? Do we have a way to restart from an existing restore file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to change this to throw a new Runtime exception so we abort the process here, meaning if you re-run the command it's gonna detect the files exist and skip the save to file part but it will still reset the db and try to restore from file.

throw new RuntimeException(e);
}
}

private static void deleteFiles(final String batchFileNameBase, final long numberOfBatches) {

LOG.info("Deleting files...");

for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
File file = new File(batchFileNameBase + "-" + batchNumber);
if (file.exists()) {
file.delete();
}
}
}

private static List<Hash> getTrieLogKeysForBlocks(
final MutableBlockchain blockchain,
final long firstBlockOfBatch,
final long lastBlockOfBatch) {
final List<Hash> trieLogKeys = new ArrayList<>();
for (long i = firstBlockOfBatch; i >= lastBlockOfBatch; i--) {
final Optional<BlockHeader> header = blockchain.getBlockHeader(i);
header.ifPresentOrElse(
blockHeader -> trieLogKeys.add(blockHeader.getHash()),
() -> LOG.error("Error retrieving block"));
}
return trieLogKeys;
}

private static long calculateNumberofBatches(final long layersToRetain) {
return layersToRetain / BATCH_SIZE + ((layersToRetain % BATCH_SIZE == 0) ? 0 : 1);
}

private static boolean validPruneRequirements(
final MutableBlockchain blockchain,
final long chainHeight,
final long lastBlockNumberToRetainTrieLogsFor) {
if (lastBlockNumberToRetainTrieLogsFor < 0) {
throw new IllegalArgumentException(
"Trying to retain more trie logs than chain length ("
+ chainHeight
+ "), skipping pruning");
}

final Optional<Hash> finalizedBlockHash = blockchain.getFinalized();

if (finalizedBlockHash.isEmpty()) {
throw new RuntimeException("No finalized block present, can't safely run trie log prune");
} else {
final Hash finalizedHash = finalizedBlockHash.get();
final Optional<BlockHeader> finalizedBlockHeader = blockchain.getBlockHeader(finalizedHash);
if (finalizedBlockHeader.isPresent()
&& finalizedBlockHeader.get().getNumber() < lastBlockNumberToRetainTrieLogsFor) {
throw new IllegalArgumentException(
"Trying to prune more layers than the finalized block height, skipping pruning");
}
}
return true;
}

private static void recreateTrieLogs(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase)
throws IOException {
// process in chunk to avoid OOM

IdentityHashMap<byte[], byte[]> trieLogsToRetain =
readTrieLogsFromFile(batchFileNameBase, batchNumber);
final int chunkSize = ROCKSDB_MAX_INSERTS_PER_TRANSACTION;
List<byte[]> keys = new ArrayList<>(trieLogsToRetain.keySet());

for (int startIndex = 0; startIndex < keys.size(); startIndex += chunkSize) {
processTransactionChunk(startIndex, chunkSize, keys, trieLogsToRetain, rootWorldStateStorage);
}
}

private static void processTransactionChunk(
final int startIndex,
final int chunkSize,
final List<byte[]> keys,
final IdentityHashMap<byte[], byte[]> trieLogsToRetain,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) {

var updater = rootWorldStateStorage.updater();
int endIndex = Math.min(startIndex + chunkSize, keys.size());

for (int i = startIndex; i < endIndex; i++) {
byte[] key = keys.get(i);
byte[] value = trieLogsToRetain.get(key);
updater.getTrieLogStorageTransaction().put(key, value);
LOG.info("Key({}): {}", i, Bytes32.wrap(key).toShortHexString());
}

updater.getTrieLogStorageTransaction().commit();
}

private static void validatePruneConfiguration(final DataStorageConfiguration config) {
checkArgument(
config.getUnstable().getBonsaiTrieLogRetentionThreshold()
>= config.getBonsaiMaxLayersToLoad(),
String.format(
"--Xbonsai-trie-log-retention-threshold minimum value is %d",
config.getBonsaiMaxLayersToLoad()));
checkArgument(
config.getUnstable().getBonsaiTrieLogPruningLimit() > 0,
String.format(
"--Xbonsai-trie-log-pruning-limit=%d must be greater than 0",
config.getUnstable().getBonsaiTrieLogPruningLimit()));
checkArgument(
config.getUnstable().getBonsaiTrieLogPruningLimit()
> config.getUnstable().getBonsaiTrieLogRetentionThreshold(),
String.format(
"--Xbonsai-trie-log-pruning-limit=%d must greater than --Xbonsai-trie-log-retention-threshold=%d",
config.getUnstable().getBonsaiTrieLogPruningLimit(),
config.getUnstable().getBonsaiTrieLogRetentionThreshold()));
}

private static void saveTrieLogsInFile(
final List<Hash> trieLogsKeys,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase)
throws IOException {

File file = new File(batchFileNameBase + "-" + batchNumber);
if (file.exists()) {
LOG.error("File already exists, skipping file creation");
return;
}

try (FileOutputStream fos = new FileOutputStream(file)) {
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(getTrieLogs(trieLogsKeys, rootWorldStateStorage));
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
private static IdentityHashMap<byte[], byte[]> readTrieLogsFromFile(
final String batchFileNameBase, final long batchNumber) {

IdentityHashMap<byte[], byte[]> trieLogs;
try (FileInputStream fis = new FileInputStream(batchFileNameBase + "-" + batchNumber);
ObjectInputStream ois = new ObjectInputStream(fis)) {

trieLogs = (IdentityHashMap<byte[], byte[]>) ois.readObject();

} catch (IOException | ClassNotFoundException e) {

LOG.error(e.getMessage());
throw new RuntimeException(e);
}

return trieLogs;
}

private static IdentityHashMap<byte[], byte[]> getTrieLogs(
final List<Hash> trieLogKeys, final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) {
IdentityHashMap<byte[], byte[]> trieLogsToRetain = new IdentityHashMap<>();

LOG.info("Obtaining trielogs from db, this may take a few minutes...");
trieLogKeys.forEach(
hash ->
rootWorldStateStorage
.getTrieLog(hash)
.ifPresent(trieLog -> trieLogsToRetain.put(hash.toArrayUnsafe(), trieLog)));
return trieLogsToRetain;
}

static TrieLogCount getCount(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final int limit,
final Blockchain blockchain) {
final AtomicInteger total = new AtomicInteger();
final AtomicInteger canonicalCount = new AtomicInteger();
final AtomicInteger forkCount = new AtomicInteger();
final AtomicInteger orphanCount = new AtomicInteger();
rootWorldStateStorage
.streamTrieLogKeys(limit)
.map(Bytes32::wrap)
.map(Hash::wrap)
.forEach(
hash -> {
total.getAndIncrement();
blockchain
.getBlockHeader(hash)
.ifPresentOrElse(
(header) -> {
long number = header.getNumber();
final Optional<BlockHeader> headerByNumber =
blockchain.getBlockHeader(number);
if (headerByNumber.isPresent()
&& headerByNumber.get().getHash().equals(hash)) {
canonicalCount.getAndIncrement();
} else {
forkCount.getAndIncrement();
}
},
orphanCount::getAndIncrement);
});

return new TrieLogCount(total.get(), canonicalCount.get(), forkCount.get(), orphanCount.get());
}

static void printCount(final PrintWriter out, final TrieLogCount count) {
out.printf(
"trieLog count: %s\n - canonical count: %s\n - fork count: %s\n - orphaned count: %s\n",
count.total, count.canonicalCount, count.forkCount, count.orphanCount);
}

record TrieLogCount(int total, int canonicalCount, int forkCount, int orphanCount) {}
}
Loading
Loading