Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signature consensus ratio config #1950

Merged
merged 9 commits into from
May 14, 2021
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.balance.threads` | 15 | The number of threads to search for new files to download |
| `hedera.mirror.importer.downloader.bucketName` | | The cloud storage bucket name to download streamed files. This value takes priority over network hardcoded bucket names regardless of `hedera.mirror.importer.network` value.|
| `hedera.mirror.importer.downloader.cloudProvider` | S3 | The cloud provider to download files from. Either `S3` or `GCP` |
| `hedera.mirror.importer.downloader.consensusRatio` | 0.333 | The ratio of verified nodes (nodes used to come to consensus on the signature file hash) to total number of nodes available |
| `hedera.mirror.importer.downloader.endpointOverride` | | Can be specified to download streams from a source other than S3 and GCP. Should be S3 compatible |
| `hedera.mirror.importer.downloader.event.batchSize` | 100 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.event.enabled` | false | Whether to enable event file downloads |
Expand Down
4 changes: 2 additions & 2 deletions docs/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Following is list of error messages and how to begin handling issues when they a

- There is no immediate fix. Bring to team's attention immediately (during reasonable hours, otherwise next morning).

- `File could not be verified by at least 1/3 of nodes`
- `Insufficient downloaded signature file count, requires at least`
This can happen if

1. Some mainnet nodes are still in the process of uploading their signatures for the latest file (benign case).
Expand Down Expand Up @@ -249,7 +249,7 @@ Alerts: Low-Priority PagerDuty Alert during business hours only Response: Requir
| `Missing signature for file` | LOW | |
| `Error saving file in database` | NONE | HIGH (if 30 entries in 1 min) |
| `Failed downloading` | NONE | HIGH (if 30 entries in 1 min) |
| `File could not be verified by at least 1/3 of nodes | NONE | HIGH (if 30 entries in 1 min) |
| `Insufficient downloaded signature file count, requires at least` | NONE | HIGH (if 30 entries in 1 min) |
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
| `Signature verification failed` | NONE | HIGH (if 30 entries in 1 min) |
| `Unable to connect to database` | NONE | HIGH (if 30 entries in 1 min) |
| `Unable to fetch entity types` | NONE | HIGH (if 30 entries in 1 min) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class FileStreamSignature implements Comparable<FileStreamSignature> {
private SignatureStatus status = SignatureStatus.DOWNLOADED;
private byte[] metadataHash;
private byte[] metadataHashSignature;
private StreamType streamType;

@Override
public int compareTo(FileStreamSignature other) {
Expand All @@ -64,9 +65,10 @@ public String getNodeAccountIdString() {
}

public enum SignatureStatus {
DOWNLOADED, // Signature has been downloaded and parsed but not verified
VERIFIED, // Signature has been verified against the node's public key
CONSENSUS_REACHED // At least 1/3 of all nodes have been verified
DOWNLOADED, // Signature has been downloaded and parsed but not verified
VERIFIED, // Signature has been verified against the node's public key
CONSENSUS_REACHED, // Signature verification consensus reached by a node count greater than the consensusRatio
NOT_FOUND, // Signature for given node was not found for download
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* ‍
*/

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -46,6 +47,10 @@ public String getBucketName() {
return StringUtils.isNotBlank(bucketName) ? bucketName : mirrorProperties.getNetwork().getBucketName();
}

@Max(1)
@Min(0)
private float consensusRatio = 0.333f;

@NotNull
private CloudProvider cloudProvider = CloudProvider.S3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.TreeMultimap;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.nio.file.Path;
Expand Down Expand Up @@ -99,7 +98,6 @@ public abstract class Downloader<T extends StreamFile> {
private final MeterRegistry meterRegistry;
private final Timer cloudStorageLatencyMetric;
private final Timer downloadLatencyMetric;
private final Counter.Builder signatureVerificationMetric;
private final Timer streamCloseMetric;
private final Timer.Builder streamVerificationMetric;

Expand Down Expand Up @@ -138,10 +136,6 @@ protected Downloader(S3AsyncClient s3Client,
.tag("type", streamType.toString())
.register(meterRegistry);

signatureVerificationMetric = Counter.builder("hedera.mirror.download.signature.verification")
.description("The number of signatures verified from a particular node")
.tag("type", streamType.toString());

streamCloseMetric = Timer.builder("hedera.mirror.stream.close.latency")
.description("The difference between the consensus time of the last and first transaction in the " +
"stream file")
Expand Down Expand Up @@ -313,6 +307,7 @@ private Optional<FileStreamSignature> parseSignatureFile(PendingDownload pending
StreamFileData streamFileData = new StreamFileData(streamFilename, pendingDownload.getBytes());
FileStreamSignature fileStreamSignature = signatureFileReader.read(streamFileData);
fileStreamSignature.setNodeAccountId(nodeAccountId);
fileStreamSignature.setStreamType(streamType);
return Optional.of(fileStreamSignature);
}

Expand Down Expand Up @@ -395,16 +390,6 @@ private void verifySigsAndDownloadDataFiles(Multimap<String, FileStreamSignature
continue;
}
throw ex;
} finally {
for (FileStreamSignature signature : signatures) {
EntityId nodeAccountId = signature.getNodeAccountId();
signatureVerificationMetric.tag("nodeAccount", nodeAccountId.getEntityNum().toString())
.tag("realm", nodeAccountId.getRealmNum().toString())
.tag("shard", nodeAccountId.getShardNum().toString())
.tag("status", signature.getStatus().toString())
.register(meterRegistry)
.increment();
}
}

for (FileStreamSignature signature : signatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,50 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.security.PublicKey;
import java.security.Signature;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Named;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.CollectionUtils;

import com.hedera.mirror.importer.addressbook.AddressBookService;
import com.hedera.mirror.importer.domain.AddressBook;
import com.hedera.mirror.importer.domain.EntityId;
import com.hedera.mirror.importer.domain.EntityTypeEnum;
import com.hedera.mirror.importer.domain.FileStreamSignature;
import com.hedera.mirror.importer.domain.FileStreamSignature.SignatureStatus;
import com.hedera.mirror.importer.exception.SignatureVerificationException;

@Named
@Log4j2
@RequiredArgsConstructor
public class NodeSignatureVerifier {

private final AddressBookService addressBookService;
private final CommonDownloaderProperties commonDownloaderProperties;

// Metrics
private final MeterRegistry meterRegistry;
private final Map<EntityId, Counter> nodeSignatureStatusMetricMap = new ConcurrentHashMap<>();

public NodeSignatureVerifier(AddressBookService addressBookService,
CommonDownloaderProperties commonDownloaderProperties,
MeterRegistry meterRegistry) {
this.addressBookService = addressBookService;
this.commonDownloaderProperties = commonDownloaderProperties;
this.meterRegistry = meterRegistry;
}

private static boolean canReachConsensus(long actualNodes, long expectedNodes) {
return actualNodes >= Math.ceil(expectedNodes / 3.0);
private boolean canReachConsensus(long actualNodes, long expectedNodes) {
return actualNodes >= Math.ceil(expectedNodes * commonDownloaderProperties.getConsensusRatio());
}

/**
Expand Down Expand Up @@ -76,9 +94,14 @@ public void verify(Collection<FileStreamSignature> signatures) throws SignatureV
long sigFileCount = signatures.size();
long nodeCount = nodeAccountIDPubKeyMap.size();
if (!canReachConsensus(sigFileCount, nodeCount)) {
throw new SignatureVerificationException("Require at least 1/3 signature files to reach consensus, got " +
sigFileCount + " out of " + nodeCount + " for file " + filename + ": " + statusMap(signatures,
nodeAccountIDPubKeyMap));
throw new SignatureVerificationException(String.format(
"Insufficient downloaded signature file count, requires at least %.03f to reach consensus, got %d" +
" out of %d for file %s: %s",
commonDownloaderProperties.getConsensusRatio(),
sigFileCount,
nodeCount,
filename,
statusMap(signatures, nodeAccountIDPubKeyMap)));
}

for (FileStreamSignature fileStreamSignature : signatures) {
Expand All @@ -88,6 +111,11 @@ public void verify(Collection<FileStreamSignature> signatures) throws SignatureV
}
}

if (commonDownloaderProperties.getConsensusRatio() == 0 && signatureHashMap.size() > 0) {
log.debug("Signature file {} does not require consensus, skipping consensus check", filename);
return;
}

for (String key : signatureHashMap.keySet()) {
Collection<FileStreamSignature> validatedSignatures = signatureHashMap.get(key);

Expand Down Expand Up @@ -153,17 +181,44 @@ private boolean verifySignature(FileStreamSignature fileStreamSignature,
return false;
}

private Map<String, Collection<String>> statusMap(Collection<FileStreamSignature> signatures, Map<String,
private Map<String, Collection<EntityId>> statusMap(Collection<FileStreamSignature> signatures, Map<String,
PublicKey> nodeAccountIDPubKeyMap) {
Map<String, Collection<String>> statusMap = signatures.stream()
Map<String, Collection<EntityId>> statusMap = signatures.stream()
.collect(Collectors.groupingBy(fss -> fss.getStatus().toString(),
Collectors.mapping(FileStreamSignature::getNodeAccountIdString, Collectors
Collectors.mapping(FileStreamSignature::getNodeAccountId, Collectors
.toCollection(TreeSet::new))));
Set<String> seenNodes = signatures.stream().map(FileStreamSignature::getNodeAccountIdString)
.collect(Collectors.toSet());
Set<String> missingNodes = new TreeSet<>(Sets.difference(nodeAccountIDPubKeyMap.keySet(), seenNodes));
statusMap.put("MISSING", missingNodes);
statusMap.remove(SignatureStatus.CONSENSUS_REACHED.toString());

Set<EntityId> seenNodes = new HashSet<>();
signatures.forEach(signature -> seenNodes.add(signature.getNodeAccountId()));

Set<EntityId> missingNodes = new TreeSet<>(Sets.difference(
nodeAccountIDPubKeyMap.keySet().stream().map(x -> EntityId.of(x, EntityTypeEnum.ACCOUNT))
.collect(Collectors.toSet()),
seenNodes));
statusMap.put(SignatureStatus.NOT_FOUND.toString(), missingNodes);

String streamType = CollectionUtils.isEmpty(signatures) ? "unknown" :
signatures.stream().map(FileStreamSignature::getStreamType).findFirst().toString();
for (Map.Entry<String, Collection<EntityId>> entry : statusMap.entrySet()) {
entry.getValue().forEach(nodeAccountId -> {
Counter counter = nodeSignatureStatusMetricMap.computeIfAbsent(
nodeAccountId,
n -> newStatusMetric(nodeAccountId, streamType, entry.getKey()));
counter.increment();
});
}

return statusMap;
}

private Counter newStatusMetric(EntityId entityId, String streamType, String status) {
return Counter.builder("hedera.mirror.download.signature.verification")
.description("The number of signatures verified from a particular node")
.tag("nodeAccount", entityId.getEntityNum().toString())
.tag("realm", entityId.getRealmNum().toString())
.tag("shard", entityId.getShardNum().toString())
.tag("type", streamType)
.tag("status", status)
.register(meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ mirrorProperties, commonDownloaderProperties, new MetricsExecutionInterceptor(me

signatureFileReader = new CompositeSignatureFileReader(new SignatureFileReaderV2(),
new SignatureFileReaderV5());
nodeSignatureVerifier = new NodeSignatureVerifier(addressBookService);
nodeSignatureVerifier = new NodeSignatureVerifier(
addressBookService,
downloaderProperties.getCommon(),
meterRegistry);
downloader = getDownloader();
streamType = downloaderProperties.getStreamType();

Expand Down
Loading