Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signature consensus ratio config #1950

Merged
merged 9 commits into from
May 14, 2021
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.balance.threads` | 15 | The number of threads to search for new files to download |
| `hedera.mirror.importer.downloader.bucketName` | | The cloud storage bucket name to download streamed files. This value takes priority over network hardcoded bucket names regardless of `hedera.mirror.importer.network` value.|
| `hedera.mirror.importer.downloader.cloudProvider` | S3 | The cloud provider to download files from. Either `S3` or `GCP` |
| `hedera.mirror.importer.downloader.consensusRatio` | 0.333 | The ratio of verified nodes (nodes used to come to consensus on the signature file hash) to total number of nodes available |
| `hedera.mirror.importer.downloader.endpointOverride` | | Can be specified to download streams from a source other than S3 and GCP. Should be S3 compatible |
| `hedera.mirror.importer.downloader.event.batchSize` | 100 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.event.enabled` | false | Whether to enable event file downloads |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class FileStreamSignature implements Comparable<FileStreamSignature> {
private SignatureStatus status = SignatureStatus.DOWNLOADED;
private byte[] metadataHash;
private byte[] metadataHashSignature;
private StreamType streamType;

@Override
public int compareTo(FileStreamSignature other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* ‍
*/

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import lombok.Data;
Expand All @@ -46,6 +47,10 @@ public String getBucketName() {
return StringUtils.isNotBlank(bucketName) ? bucketName : mirrorProperties.getNetwork().getBucketName();
}

@Max(1)
@Min(0)
private float consensusRatio = 0.333f;

@NotNull
private CloudProvider cloudProvider = CloudProvider.S3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.TreeMultimap;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.nio.file.Path;
Expand Down Expand Up @@ -99,7 +98,6 @@ public abstract class Downloader<T extends StreamFile> {
private final MeterRegistry meterRegistry;
private final Timer cloudStorageLatencyMetric;
private final Timer downloadLatencyMetric;
private final Counter.Builder signatureVerificationMetric;
private final Timer streamCloseMetric;
private final Timer.Builder streamVerificationMetric;

Expand Down Expand Up @@ -138,10 +136,6 @@ protected Downloader(S3AsyncClient s3Client,
.tag("type", streamType.toString())
.register(meterRegistry);

signatureVerificationMetric = Counter.builder("hedera.mirror.download.signature.verification")
.description("The number of signatures verified from a particular node")
.tag("type", streamType.toString());

streamCloseMetric = Timer.builder("hedera.mirror.stream.close.latency")
.description("The difference between the consensus time of the last and first transaction in the " +
"stream file")
Expand Down Expand Up @@ -313,6 +307,7 @@ private Optional<FileStreamSignature> parseSignatureFile(PendingDownload pending
StreamFileData streamFileData = new StreamFileData(streamFilename, pendingDownload.getBytes());
FileStreamSignature fileStreamSignature = signatureFileReader.read(streamFileData);
fileStreamSignature.setNodeAccountId(nodeAccountId);
fileStreamSignature.setStreamType(streamType);
return Optional.of(fileStreamSignature);
}

Expand Down Expand Up @@ -395,16 +390,6 @@ private void verifySigsAndDownloadDataFiles(Multimap<String, FileStreamSignature
continue;
}
throw ex;
} finally {
for (FileStreamSignature signature : signatures) {
EntityId nodeAccountId = signature.getNodeAccountId();
signatureVerificationMetric.tag("nodeAccount", nodeAccountId.getEntityNum().toString())
.tag("realm", nodeAccountId.getRealmNum().toString())
.tag("shard", nodeAccountId.getShardNum().toString())
.tag("status", signature.getStatus().toString())
.register(meterRegistry)
.increment();
}
}

for (FileStreamSignature signature : signatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,56 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.security.PublicKey;
import java.security.Signature;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Named;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import com.hedera.mirror.importer.addressbook.AddressBookService;
import com.hedera.mirror.importer.domain.AddressBook;
import com.hedera.mirror.importer.domain.EntityId;
import com.hedera.mirror.importer.domain.EntityTypeEnum;
import com.hedera.mirror.importer.domain.FileStreamSignature;
import com.hedera.mirror.importer.domain.FileStreamSignature.SignatureStatus;
import com.hedera.mirror.importer.exception.SignatureVerificationException;

@Named
@Log4j2
@RequiredArgsConstructor
public class NodeSignatureVerifier {

private final AddressBookService addressBookService;
private final CommonDownloaderProperties commonDownloaderProperties;

// Metrics
private final MeterRegistry meterRegistry;
private final Map<EntityId, Counter> metricMap = new ConcurrentHashMap<>();
private final Counter.Builder missingNodeSignatureFileMetric;
private final Counter.Builder signatureVerificationMetric;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to store these as fields since their construction happens so rarely and it's better to encapsulate the metric creation in a single method. See PublishMetrics for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted similar logic


public NodeSignatureVerifier(AddressBookService addressBookService,
CommonDownloaderProperties commonDownloaderProperties,
MeterRegistry meterRegistry) {
this.addressBookService = addressBookService;
this.commonDownloaderProperties = commonDownloaderProperties;
this.meterRegistry = meterRegistry;

missingNodeSignatureFileMetric = Counter.builder("hedera.mirror.download.signature.missing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to just have a single metric for both. We can add a SignatureStatus.NOT_FOUND to capture that unique state. The other reason it might be needed is the alert that we have:

sum(rate(hedera_mirror_download_signature_verification_total{application="hedera-mirror-importer", status="CONSENSUS_REACHED"}[2m])) by (namespace, pod, type)
/ sum(rate(hedera_mirror_download_signature_verification_total{application="hedera-mirror-importer"}[2m])) by (namespace, pod, type) < 0.33

Right now there's a problem that if a node is missing the dividend of that equation is decreasing, skewing the accuracy of that calculation. We need to ensure we always have the total be equal to the number of nodes in the address book.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined into one counter

.description("The number of nodes whose signatures are missing from the consensus process");
signatureVerificationMetric = Counter.builder("hedera.mirror.download.signature.verification")
.description("The number of signatures verified from a particular node");
}

private static boolean canReachConsensus(long actualNodes, long expectedNodes) {
return actualNodes >= Math.ceil(expectedNodes / 3.0);
private boolean canReachConsensus(long actualNodes, long expectedNodes) {
return actualNodes >= Math.ceil(expectedNodes * commonDownloaderProperties.getConsensusRatio());
}

/**
Expand Down Expand Up @@ -76,9 +100,13 @@ public void verify(Collection<FileStreamSignature> signatures) throws SignatureV
long sigFileCount = signatures.size();
long nodeCount = nodeAccountIDPubKeyMap.size();
if (!canReachConsensus(sigFileCount, nodeCount)) {
throw new SignatureVerificationException("Require at least 1/3 signature files to reach consensus, got " +
sigFileCount + " out of " + nodeCount + " for file " + filename + ": " + statusMap(signatures,
nodeAccountIDPubKeyMap));
throw new SignatureVerificationException(String.format(
"Requires at least %s of signature files to reach consensus, got %d out of %d for file %s: %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update troubleshooting doc

Copy link
Contributor Author

@Nana-EC Nana-EC May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that file and the alert based on it has been out of date for ages. It's still looking for logs in the form “File could not be verified by at least 2/3 of nodes”

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna modify this to start with "Insufficient downloaded signature file count, requires at least..." to make ti easier to alert on and understand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "Unable to reach consensus"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see this before my last push.
However, I do want to call out the lack of downloaded files because we do the canReachConsensus before verification and again after. So it would be nice to distinguish.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you string formatting twice?

Suggested change
"Requires at least %s of signature files to reach consensus, got %d out of %d for file %s: %s",
"Requires at least %.03f of signature files to reach consensus, got %d out of %d for file %s: %s",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

String.format("%.03f", commonDownloaderProperties.getConsensusRatio()),
sigFileCount,
nodeCount,
filename,
statusMap(signatures, nodeAccountIDPubKeyMap)));
}

for (FileStreamSignature fileStreamSignature : signatures) {
Expand All @@ -104,6 +132,9 @@ public void verify(Collection<FileStreamSignature> signatures) throws SignatureV
log.warn("Verified signature file {} reached consensus but with some errors: {}", filename,
statusMap(signatures, nodeAccountIDPubKeyMap));
return;
} else if (commonDownloaderProperties.getConsensusRatio() == 0) {
log.debug("Signature file {} does not require consensus, skipping verification", filename);
return;
Copy link
Collaborator

@xin-hedera xin-hedera May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will pass when no signature file is verified, i.e., signatureHashMap is empty

the possible effect is an illegitimate stream file can pass verification and get ingested

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. You're highlighting the fact that even if consensusRatio is set to 0 we should still only proceed with no error if there are some valid signature stream files.
Makes sense, I'll update it as I don't see a scenario where we'd want to pass with no verified signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should not log it at all unless it's important.

the updated if block is dead code, since when signatureHashMap.size() > 0, consensusCount will be > 0, and the the block above it with condition if (consensusCount > 0) will run instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I thought this was the scenario you highlighted in your original comment, no.
Initially the plan was to not fail when number of verified signature files was = 0 and commonDownloaderProperties.getConsensusRatio() == 0.
However, per you comment we probably only want to allow this with verified files.

Probably better to move this check up to after verification but before consensus calculation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

throw new SignatureVerificationException("Signature verification failed for file " + filename + ": " + statusMap(signatures, nodeAccountIDPubKeyMap));
Expand Down Expand Up @@ -153,17 +184,45 @@ private boolean verifySignature(FileStreamSignature fileStreamSignature,
return false;
}

private Map<String, Collection<String>> statusMap(Collection<FileStreamSignature> signatures, Map<String,
private Map<String, Collection<EntityId>> statusMap(Collection<FileStreamSignature> signatures, Map<String,
PublicKey> nodeAccountIDPubKeyMap) {
Map<String, Collection<String>> statusMap = signatures.stream()
Map<String, Collection<EntityId>> statusMap = signatures.stream()
.collect(Collectors.groupingBy(fss -> fss.getStatus().toString(),
Collectors.mapping(FileStreamSignature::getNodeAccountIdString, Collectors
Collectors.mapping(FileStreamSignature::getNodeAccountId, Collectors
.toCollection(TreeSet::new))));
Set<String> seenNodes = signatures.stream().map(FileStreamSignature::getNodeAccountIdString)
.collect(Collectors.toSet());
Set<String> missingNodes = new TreeSet<>(Sets.difference(nodeAccountIDPubKeyMap.keySet(), seenNodes));

Set<EntityId> seenNodes = new HashSet<>();
signatures.forEach(signature -> {
seenNodes.add(signature.getNodeAccountId());
EntityId nodeAccountId = signature.getNodeAccountId();

metricMap.computeIfAbsent(nodeAccountId, n -> signatureVerificationMetric
.tag("nodeAccount", nodeAccountId.getEntityNum().toString())
.tag("realm", nodeAccountId.getRealmNum().toString())
.tag("shard", nodeAccountId.getShardNum().toString())
.tag("status", signature.getStatus().toString())
.tag("type", signature.getStreamType().toString())
.register(meterRegistry))
.increment();
});

Set<EntityId> missingNodes = new TreeSet<>(Sets.difference(
nodeAccountIDPubKeyMap.keySet().stream().map(x -> EntityId.of(x, EntityTypeEnum.ACCOUNT))
.collect(Collectors.toSet()),
seenNodes));
statusMap.put("MISSING", missingNodes);
statusMap.remove(SignatureStatus.CONSENSUS_REACHED.toString());

String streamType = signatures.stream().map(FileStreamSignature::getStreamType).findFirst().toString();
missingNodes.forEach(nodeAccountId -> metricMap
Copy link
Member

@steven-sheehy steven-sheehy May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two different counters can't be stored in the same map. As it is now, only the first counter is being created and then incremented for both. You will need two separate maps. Or if we combine metrics can ignore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined into one counter needing only one map

.computeIfAbsent(nodeAccountId, n -> missingNodeSignatureFileMetric
.tag("nodeAccount", n.getEntityNum().toString())
.tag("realm", nodeAccountId.getRealmNum().toString())
.tag("shard", nodeAccountId.getShardNum().toString())
.tag("type", streamType)
.register(meterRegistry))
.increment());

return statusMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ mirrorProperties, commonDownloaderProperties, new MetricsExecutionInterceptor(me

signatureFileReader = new CompositeSignatureFileReader(new SignatureFileReaderV2(),
new SignatureFileReaderV5());
nodeSignatureVerifier = new NodeSignatureVerifier(addressBookService);
nodeSignatureVerifier = new NodeSignatureVerifier(
addressBookService,
downloaderProperties.getCommon(),
meterRegistry);
downloader = getDownloader();
streamType = downloaderProperties.getStreamType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.when;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
Expand All @@ -48,6 +50,7 @@
import com.hedera.mirror.importer.domain.EntityTypeEnum;
import com.hedera.mirror.importer.domain.FileStreamSignature;
import com.hedera.mirror.importer.domain.FileStreamSignature.SignatureType;
import com.hedera.mirror.importer.domain.StreamType;
import com.hedera.mirror.importer.exception.SignatureVerificationException;

@ExtendWith(MockitoExtension.class)
Expand All @@ -57,11 +60,15 @@ class NodeSignatureVerifierTest {
private static PublicKey publicKey;

private static final EntityId nodeId = new EntityId(0L, 0L, 3L, EntityTypeEnum.ACCOUNT.getId());
private static final MeterRegistry meterRegistry = new LoggingMeterRegistry();
private Signature signer;

@Mock
private AddressBookService addressBookService;

@Mock
private CommonDownloaderProperties commonDownloaderProperties;

@Mock
private AddressBook currentAddressBook;

Expand All @@ -76,13 +83,17 @@ static void generateKeys() throws NoSuchAlgorithmException {

@BeforeEach
void setup() throws GeneralSecurityException {
nodeSignatureVerifier = new NodeSignatureVerifier(addressBookService);
nodeSignatureVerifier = new NodeSignatureVerifier(
addressBookService,
commonDownloaderProperties,
meterRegistry);
signer = Signature.getInstance("SHA384withRSA", "SunRsaSign");
signer.initSign(privateKey);
Map<String, PublicKey> nodeAccountIDPubKeyMap = new HashMap();
nodeAccountIDPubKeyMap.put("0.0.3", publicKey);
when(addressBookService.getCurrent()).thenReturn(currentAddressBook);
when(currentAddressBook.getNodeAccountIDPubKeyMap()).thenReturn(nodeAccountIDPubKeyMap);
when(commonDownloaderProperties.getConsensusRatio()).thenReturn(0.333f);
}

@Test
Expand Down Expand Up @@ -149,7 +160,29 @@ void testCannotReachConsensus() {
List<FileStreamSignature> fileStreamSignatures = Arrays.asList(buildBareBonesFileStreamSignature());
Exception e = assertThrows(SignatureVerificationException.class, () -> nodeSignatureVerifier
.verify(fileStreamSignatures));
assertTrue(e.getMessage().contains("Require at least 1/3 signature files to reach consensus"));
assertTrue(e.getMessage().contains("Requires at least 0.333 of signature files to reach consensus"));
}

@Test
void testNoConsensusRequired() throws GeneralSecurityException {
Map<String, PublicKey> nodeAccountIDPubKeyMap = new HashMap();
nodeAccountIDPubKeyMap.put("0.0.3", publicKey);
nodeAccountIDPubKeyMap.put("0.0.4", publicKey);
nodeAccountIDPubKeyMap.put("0.0.5", publicKey);
nodeAccountIDPubKeyMap.put("0.0.6", publicKey);
nodeAccountIDPubKeyMap.put("0.0.7", publicKey);
nodeAccountIDPubKeyMap.put("0.0.8", publicKey);
nodeAccountIDPubKeyMap.put("0.0.9", publicKey);
nodeAccountIDPubKeyMap.put("0.0.10", publicKey);

when(currentAddressBook.getNodeAccountIDPubKeyMap()).thenReturn(nodeAccountIDPubKeyMap);
when(commonDownloaderProperties.getConsensusRatio()).thenReturn(0f);

byte[] fileHash = TestUtils.generateRandomByteArray(48);
byte[] fileHashSignature = signHash(fileHash);

// only 1 node node necessary
nodeSignatureVerifier.verify(List.of());
}

@Test
Expand All @@ -174,7 +207,36 @@ void testVerifiedWithOneThirdConsensus() throws GeneralSecurityException {
null, null);
fileStreamSignatureNode4.setNodeAccountId(new EntityId(0L, 0L, 5L, EntityTypeEnum.ACCOUNT.getId()));

nodeSignatureVerifier.verify(Arrays.asList(fileStreamSignatureNode3, fileStreamSignatureNode5));
nodeSignatureVerifier
.verify(Arrays.asList(fileStreamSignatureNode3, fileStreamSignatureNode5));
}

@Test
void testVerifiedWithFullConsensusRequired() throws GeneralSecurityException {
Map<String, PublicKey> nodeAccountIDPubKeyMap = new HashMap();
nodeAccountIDPubKeyMap.put("0.0.3", publicKey);
nodeAccountIDPubKeyMap.put("0.0.4", publicKey);
nodeAccountIDPubKeyMap.put("0.0.5", publicKey);
when(currentAddressBook.getNodeAccountIDPubKeyMap()).thenReturn(nodeAccountIDPubKeyMap);
when(commonDownloaderProperties.getConsensusRatio()).thenReturn(1f);

byte[] fileHash = TestUtils.generateRandomByteArray(48);
byte[] fileHashSignature = signHash(fileHash);

FileStreamSignature fileStreamSignatureNode3 = buildFileStreamSignature(fileHash, fileHashSignature,
null, null);
fileStreamSignatureNode3.setNodeAccountId(new EntityId(0L, 0L, 3L, EntityTypeEnum.ACCOUNT.getId()));

FileStreamSignature fileStreamSignatureNode4 = buildFileStreamSignature(fileHash, fileHashSignature,
null, null);
fileStreamSignatureNode4.setNodeAccountId(new EntityId(0L, 0L, 4L, EntityTypeEnum.ACCOUNT.getId()));

FileStreamSignature fileStreamSignatureNode5 = buildFileStreamSignature(fileHash, fileHashSignature,
null, null);
fileStreamSignatureNode5.setNodeAccountId(new EntityId(0L, 0L, 5L, EntityTypeEnum.ACCOUNT.getId()));

nodeSignatureVerifier
.verify(Arrays.asList(fileStreamSignatureNode3, fileStreamSignatureNode4, fileStreamSignatureNode5));
}

@Test
Expand Down Expand Up @@ -259,6 +321,7 @@ private FileStreamSignature buildBareBonesFileStreamSignature() {
fileStreamSignature.setFilename("");
fileStreamSignature.setNodeAccountId(nodeId);
fileStreamSignature.setSignatureType(SignatureType.SHA_384_WITH_RSA);
fileStreamSignature.setStreamType(StreamType.RECORD);
return fileStreamSignature;
}

Expand Down