Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retry logic on sync pipeline for rocksdb issue #6004

Merged
merged 13 commits into from
Oct 24, 2023
Merged
Next Next commit
add retry logic for sync pipeline with rocksdb issue
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
  • Loading branch information
matkt committed Oct 9, 2023
commit 371f5e706ee043533cb5e537981a1d30ac58c2ab
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.hyperledger.besu.ethereum.eth.sync;

import org.hyperledger.besu.plugin.services.exception.StorageException;

matkt marked this conversation as resolved.
Show resolved Hide resolved
public final class StorageExceptionManager {

private static final String rocksdbClassName = "org.rocksdb.RocksDBException";
private static final String ERR_BUSY = "Busy";
private static final String ERR_LOCK_TIMED_OUT = "TimedOut(LockTimeout)";
matkt marked this conversation as resolved.
Show resolved Hide resolved

public static boolean canRetryOnError(final StorageException e) {
return e.getMessage().contains(rocksdbClassName)
&& (e.getMessage().contains(ERR_BUSY) || e.getMessage().contains(ERR_LOCK_TIMED_OUT));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less brittle to use enum values

Suggested change
return e.getMessage().contains(rocksdbClassName)
&& (e.getMessage().contains(ERR_BUSY) || e.getMessage().contains(ERR_LOCK_TIMED_OUT));
}
return Optional.of(e.getCause())
.filter(z -> z instanceof RocksDBException)
.map(RocksDBException.class::cast)
.map(RocksDBException::getStatus)
.map(Status::getCode)
.map(RETRYABLE_STATUS_CODES::contains)
.orElse(false);

Copy link
Contributor Author

@matkt matkt Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be good, will update

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;

import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.services.tasks.Task;

import java.util.List;
Expand All @@ -33,24 +36,34 @@ public List<Task<NodeDataRequest>> persist(
final List<Task<NodeDataRequest>> tasks,
final BlockHeader blockHeader,
final WorldDownloadState<NodeDataRequest> downloadState) {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(
task -> {
enqueueChildren(task, downloadState);
return task;
})
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
try {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(
task -> {
enqueueChildren(task, downloadState);
return task;
})
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later.
tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not fail the future for an individual task in the foreach rather than failing the whole list? or is that a lot more plumbing to do in the pipeline to handle a single failed task future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible because all modification are in the same updater and we cannot commit only a part of a transaction

throw storageException;
}
}
return tasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;

import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.services.pipeline.Pipe;
import org.hyperledger.besu.services.tasks.Task;
Expand Down Expand Up @@ -58,19 +61,29 @@ public Stream<Task<SnapDataRequest>> loadLocalDataTrieNode(
final Task<SnapDataRequest> task, final Pipe<Task<SnapDataRequest>> completedTasks) {
final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData();
// check if node is already stored in the worldstate
if (snapSyncState.hasPivotBlockHeader()) {
Optional<Bytes> existingData = request.getExistingData(downloadState, worldStateStorage);
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(
worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration);
updater.commit();
downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage));
completedTasks.put(task);
return Stream.empty();
try {
if (snapSyncState.hasPivotBlockHeader()) {
Optional<Bytes> existingData = request.getExistingData(downloadState, worldStateStorage);
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
final WorldStateStorage.Updater updater = worldStateStorage.updater();
request.persist(
worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration);
updater.commit();
downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage));
completedTasks.put(task);
return Stream.empty();
}
}
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later.
task.getData().clear();
} else {
throw storageException;
}
}
return Stream.of(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError;

import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.services.tasks.Task;

import java.util.List;
Expand All @@ -43,41 +46,51 @@ public PersistDataStep(
}

public List<Task<SnapDataRequest>> persist(final List<Task<SnapDataRequest>> tasks) {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
for (Task<SnapDataRequest> task : tasks) {
if (task.getData().isResponseReceived()) {
// enqueue child requests
final Stream<SnapDataRequest> childRequests =
task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState);
if (!(task.getData() instanceof TrieNodeHealingRequest)) {
enqueueChildren(childRequests);
} else {
if (!task.getData().isExpired(snapSyncState)) {
try {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
for (Task<SnapDataRequest> task : tasks) {
if (task.getData().isResponseReceived()) {
// enqueue child requests
final Stream<SnapDataRequest> childRequests =
task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState);
if (!(task.getData() instanceof TrieNodeHealingRequest)) {
enqueueChildren(childRequests);
} else {
continue;
if (!task.getData().isExpired(snapSyncState)) {
enqueueChildren(childRequests);
} else {
continue;
}
}
}

// persist nodes
final int persistedNodes =
task.getData()
.persist(
worldStateStorage,
updater,
downloadState,
snapSyncState,
snapSyncConfiguration);
if (persistedNodes > 0) {
if (task.getData() instanceof TrieNodeHealingRequest) {
downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes);
} else {
downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes);
// persist nodes
final int persistedNodes =
task.getData()
.persist(
worldStateStorage,
updater,
downloadState,
snapSyncState,
snapSyncConfiguration);
if (persistedNodes > 0) {
if (task.getData() instanceof TrieNodeHealingRequest) {
downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes);
} else {
downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes);
}
}
}
}
updater.commit();
} catch (StorageException storageException) {
if (canRetryOnError(storageException)) {
// We reset the task by setting it to null. This way, it is considered as failed by the
// pipeline, and it will attempt to execute it again later.
tasks.forEach(task -> task.getData().clear());
} else {
throw storageException;
}
}
updater.commit();
return tasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ public SnapWorldStateDownloadProcess build() {
"step",
"action");

/*
The logic and intercommunication of different pipelines can be summarized as follows:

1. Account Data Pipeline (fetchAccountDataPipeline): This process starts with downloading the leaves of the account tree in ranges, with multiple ranges being processed simultaneously.
If the downloaded accounts are smart contracts, tasks are created in the storage pipeline to download the storage tree of the smart contract, and in the code download pipeline for the smart contract.

2. Storage Data Pipeline (fetchStorageDataPipeline): Running parallel to the account data pipeline, this pipeline downloads the storage of smart contracts.
If all slots cannot be downloaded at once, tasks are created in the fetchLargeStorageDataPipeline to download the storage by range, allowing parallelization of large account downloads.

3. Code Data Pipeline (fetchCodePipeline): This pipeline, running concurrently with the account and storage data pipelines, is responsible for downloading the code of the smart contracts.

4. Large Storage Data Pipeline (fetchLargeStorageDataPipeline): This pipeline is used when the storage data for a smart contract is too large to be downloaded at once.
It enables the storage data to be downloaded in ranges, similar to the account data.

5. Healing Phase: Initiated after all other pipelines have completed their tasks, this phase ensures the integrity and completeness of the downloaded data.
*/
Comment on lines +234 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

final Pipeline<Task<SnapDataRequest>> completionPipeline =
PipelineBuilder.<Task<SnapDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class StackTrie {
private final AtomicInteger nbSegments;
private final int maxSegments;
private final Bytes32 startKeyHash;
private final Map<Bytes32, TaskElement> elements;
private final AtomicLong elementsCount;
private Map<Bytes32, TaskElement> elements;
private AtomicLong elementsCount;

public StackTrie(final Hash rootHash, final Bytes32 startKeyHash) {
this(rootHash, 1, 1, startKeyHash);
Expand All @@ -78,6 +78,11 @@ public void addElement(
taskIdentifier, ImmutableTaskElement.builder().proofs(proofs).keys(keys).build());
}

public void removeElement(final Bytes32 taskIdentifier) {
;
this.elementsCount.addAndGet(-this.elements.remove(taskIdentifier).keys().size());
}

public TaskElement getElement(final Bytes32 taskIdentifier) {
return this.elements.get(taskIdentifier);
}
Expand Down Expand Up @@ -142,6 +147,11 @@ public void maybeStoreNode(final Bytes location, final Node<Bytes> node) {
}
}

public void clear() {
this.elements = new LinkedHashMap<>();
this.elementsCount = new AtomicLong();
}

public boolean addSegment() {
if (nbSegments.get() > maxSegments) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ public TreeMap<Bytes32, Bytes> getAccounts() {
return stackTrie.getElement(startKeyHash).keys();
}

@Override
public void clear() {
stackTrie.clear();
isProofValid = Optional.of(false);
}

public Bytes serialize() {
return RLP.encode(
out -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public Bytes32 getAccountHash() {
return accountHash;
}

@Override
public void clear() {
setCode(Bytes.EMPTY);
}

public Bytes32 getCodeHash() {
return codeHash;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ public Bytes32 getEndKeyHash() {
return endKeyHash;
}

@Override
public void clear() {
this.isProofValid = Optional.of(false);
this.stackTrie.removeElement(startKeyHash);
}

@VisibleForTesting
public void setProofValid(final boolean isProofValid) {
this.isProofValid = Optional.of(isProofValid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public boolean isResponseReceived() {
return !data.isEmpty() && Hash.hash(data).equals(getNodeHash());
}

@Override
public void clear() {
setData(Bytes.EMPTY);
}

@Override
public boolean isExpired(final SnapSyncProcessState snapSyncState) {
return snapSyncState.isExpired(this);
Expand Down
Loading