Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize importer memory footprint #2103

Merged
merged 10 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.allowAnonymousAccess` | | Whether the cloud storage bucket allows for anonymous access. |
| `hedera.mirror.importer.downloader.balance.batchSize` | 30 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.balance.enabled` | true | Whether to enable balance file downloads |
| `hedera.mirror.importer.downloader.balance.frequency` | 30s | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. |
| `hedera.mirror.importer.downloader.balance.frequency` | 30s | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. |
| `hedera.mirror.importer.downloader.balance.persistBytes` | false | Whether to keep the balance file bytes so later the parser can persist it to the database. |
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
| `hedera.mirror.importer.downloader.balance.keepFiles` | false | Whether to keep verified files. |
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
| `hedera.mirror.importer.downloader.balance.keepSignatures` | false | Whether to keep balance signature files after successful verification. If false, files are deleted. |
| `hedera.mirror.importer.downloader.balance.prefix` | accountBalances/balance | The prefix to search cloud storage for balance files |
| `hedera.mirror.importer.downloader.balance.threads` | 15 | The number of threads to search for new files to download |
Expand All @@ -48,14 +50,18 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.event.batchSize` | 100 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.event.enabled` | false | Whether to enable event file downloads |
| `hedera.mirror.importer.downloader.event.frequency` | 5s | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. |
| `hedera.mirror.importer.downloader.event.persistBytes` | false | Whether to keep the event file bytes so later the parser can persist it to the database. |
| `hedera.mirror.importer.downloader.event.keepFiles` | false | Whether to keep verified files. |
| `hedera.mirror.importer.downloader.event.keepSignatures` | false | Whether to keep event signature files after successful verification. If false, files are deleted. |
| `hedera.mirror.importer.downloader.event.prefix` | eventsStreams/events\_ | The prefix to search cloud storage for event files |
| `hedera.mirror.importer.downloader.event.threads` | 15 | The number of threads to search for new files to download |
| `hedera.mirror.importer.downloader.gcpProjectId` | | GCP project id to bill for requests to GCS bucket which has Requester Pays enabled. |
| `hedera.mirror.importer.downloader.maxConcurrency` | 1000 | The maximum number of allowed open HTTP connections. Used by AWS SDK directly. |
| `hedera.mirror.importer.downloader.record.batchSize` | 40 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.record.enabled` | true | Whether to enable record file downloads |
| `hedera.mirror.importer.downloader.record.frequency` | 500ms | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. |
| `hedera.mirror.importer.downloader.record.frequency` | 500ms | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. |
| `hedera.mirror.importer.downloader.record.persistBytes` | false | Whether to keep the record file bytes so later the parser can persist it to the database. |
| `hedera.mirror.importer.downloader.record.keepFiles` | false | Whether to keep verified files. |
| `hedera.mirror.importer.downloader.record.keepSignatures` | false | Whether to keep record signature files after successful verification. If false, files are deleted. |
| `hedera.mirror.importer.downloader.record.prefix` | recordstreams/record | The prefix to search cloud storage for record files |
| `hedera.mirror.importer.downloader.record.threads` | 15 | The number of threads to search for new files to download |
Expand All @@ -70,8 +76,6 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.parser.balance.enabled` | true | Whether to enable balance file parsing |
| `hedera.mirror.importer.parser.balance.fileBufferSize` | 200000 | The size of the buffer to use when reading in the balance file |
| `hedera.mirror.importer.parser.balance.frequency` | 100ms | How often to poll for new messages. Can accept duration units like `10s`, `2m` etc. |
| `hedera.mirror.importer.parser.balance.keepFiles` | false | Whether to keep parsed files after successful parsing. |
| `hedera.mirror.importer.parser.balance.persistBytes` | false | Whether to persist the balance file bytes to the database after successful parsing. |
| `hedera.mirror.importer.parser.balance.queueCapacity` | 10 | How many balance files to queue in memory while waiting to be persisted by the parser |
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
| `hedera.mirror.importer.parser.balance.retry.maxAttempts` | 3 | How many attempts should be made to retry file parsing errors |
| `hedera.mirror.importer.parser.balance.retry.maxBackoff` | 10s | The maximum amount of time to wait between retries |
Expand All @@ -81,8 +85,6 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.parser.event.bufferSize` | 32768 | The size of the byte buffer to allocate for each batch |
| `hedera.mirror.importer.parser.event.enabled` | false | Whether to enable event file parsing |
| `hedera.mirror.importer.parser.event.frequency` | 100ms | How often to poll for new messages |
| `hedera.mirror.importer.parser.event.keepFiles` | false | Whether to keep parsed files after successful parsing. |
| `hedera.mirror.importer.parser.event.persistBytes` | false | Whether to persist the event file bytes to the database after successful parsing. |
| `hedera.mirror.importer.parser.event.queueCapacity` | 10 | How many event files to queue in memory while waiting to be persisted by the parser |
| `hedera.mirror.importer.parser.event.retry.maxAttempts` | Integer.MAX_VALUE | How many attempts should be made to retry file parsing errors |
| `hedera.mirror.importer.parser.event.retry.maxBackoff` | 10s | The maximum amount of time to wait between retries |
Expand Down Expand Up @@ -115,8 +117,6 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.parser.record.entity.sql.batchSize` | 20_000 | When inserting transactions into db, executeBatches() is called every these many transactions |
| `hedera.mirror.importer.parser.record.entity.sql.enabled` | true | Whether to use PostgreSQL Copy mechanism to insert into the database |
| `hedera.mirror.importer.parser.record.frequency` | 100ms | How often to poll for new messages. Can accept duration units like `10s`, `2m` etc. |
| `hedera.mirror.importer.parser.record.keepFiles` | false | Whether to keep parsed files after successful parsing. |
| `hedera.mirror.importer.parser.record.persistBytes` | false | Whether to persist the record file bytes to the database after successful parsing. |
| `hedera.mirror.importer.parser.record.pubsub.topicName` | | Pubsub topic to publish transactions to |
| `hedera.mirror.importer.parser.record.pubsub.maxSendAttempts` | 5 | Number of attempts when sending messages to PubSub (only for retryable errors) |
| `hedera.mirror.importer.parser.record.queueCapacity` | 10 | How many record files to queue in memory while waiting to be persisted by the parser |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,63 @@
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.messaging.MessageChannel;

import com.hedera.mirror.importer.domain.StreamFile;
import com.hedera.mirror.importer.domain.StreamType;
import com.hedera.mirror.importer.parser.ParserProperties;
import com.hedera.mirror.importer.parser.StreamFileParser;
import com.hedera.mirror.importer.parser.balance.AccountBalanceFileParser;
import com.hedera.mirror.importer.parser.balance.BalanceParserProperties;
import com.hedera.mirror.importer.parser.event.EventFileParser;
import com.hedera.mirror.importer.parser.event.EventParserProperties;
import com.hedera.mirror.importer.parser.record.ConditionalOnRecordParser;
import com.hedera.mirror.importer.parser.record.RecordFileParser;
import com.hedera.mirror.importer.parser.record.RecordParserProperties;

@Configuration
public class MessagingConfiguration {

// Shared channel containing all stream types until they're routed to the individual channels
public static final String CHANNEL_STREAM = "stream";
public static final String CHANNEL_BALANCE = CHANNEL_STREAM + ".balance";
public static final String CHANNEL_EVENT = CHANNEL_STREAM + ".event";
public static final String CHANNEL_RECORD = CHANNEL_STREAM + ".record";

private static final String CHANNEL_BALANCE = CHANNEL_STREAM + ".balance";
private static final String CHANNEL_EVENT = CHANNEL_STREAM + ".event";
private static final String CHANNEL_RECORD = CHANNEL_STREAM + ".record";
private static final String INTEGRATION_FLOW_BALANCE = "flow." + CHANNEL_BALANCE;
private static final String INTEGRATION_FLOW_EVENT = "flow." + CHANNEL_EVENT;
private static final String INTEGRATION_FLOW_RECORD = "flow." + CHANNEL_RECORD;

@Bean(CHANNEL_BALANCE)
MessageChannel channelBalance(BalanceParserProperties properties) {
return MessageChannels.queue(properties.getQueueCapacity()).get();
return channel(properties);
}

@Bean(CHANNEL_EVENT)
MessageChannel channelEvent(EventParserProperties properties) {
return MessageChannels.queue(properties.getQueueCapacity()).get();
return channel(properties);
}

@Bean(CHANNEL_RECORD)
MessageChannel channelRecord(RecordParserProperties properties) {
return MessageChannels.queue(properties.getQueueCapacity()).get();
return channel(properties);
}

@Bean(INTEGRATION_FLOW_BALANCE)
IntegrationFlow integrationFlowBalance(AccountBalanceFileParser parser) {
return integrationFlow(parser);
}

@Bean(INTEGRATION_FLOW_EVENT)
IntegrationFlow integrationFlowEvent(EventFileParser parser) {
return integrationFlow(parser);
}

@Bean(INTEGRATION_FLOW_RECORD)
@ConditionalOnRecordParser
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we only have ConditionalOnRecordParser on RecordFileParser but not the other two parsers. anyway to avoid autowire error when parser.record.enabled is false, I also added the annotation here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @ConditionalOnRecordParser was added more so to help out the pubs flow which should only check whether to run when record parser is enabled.
Balance and event parser don't have a sub flow so it didn't apply.
Although I do believe we wanted to check thee annotations at some point for easier coordination.

IntegrationFlow integrationFlowRecord(RecordFileParser parser) {
return integrationFlow(parser);
Copy link
Collaborator Author

@xin-hedera xin-hedera Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to repeat these for different beans and different names.

didn't want to move the channel bean and integration flow bean to StreamFileParser classes because I prefer separating configuration from those classes as much as I can.

}

@Bean
Expand All @@ -62,4 +90,30 @@ IntegrationFlow streamFileRouter() {
.route(StreamFile.class, s -> s.getType().toString().toLowerCase(), s -> s.prefix(CHANNEL_STREAM + "."))
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
.get();
}

private MessageChannel channel(ParserProperties properties) {
if (properties.getQueueCapacity() <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have validation to ensure the minimum is 0, do we need to check for <? Could just be ==

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we should write code defensively in case that other class changes in the future and it's the same number of characters either way.

return MessageChannels.direct().get();
}

return MessageChannels.queue(properties.getQueueCapacity()).get();
}

private IntegrationFlow integrationFlow(StreamFileParser parser) {
ParserProperties properties = parser.getProperties();
return IntegrationFlows.from(channelName(properties.getStreamType()))
.handle(StreamFile.class, (s, h) -> {
parser.parse(s);
return null;
}, e -> {
if (properties.getQueueCapacity() > 0) {
e.poller(Pollers.fixedDelay(properties.getFrequency()));
}
})
.get();
}

private String channelName(StreamType streamType) {
return CHANNEL_STREAM + "." + streamType.toString().toLowerCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@Entity
@AllArgsConstructor
@NoArgsConstructor
public class AccountBalanceFile implements StreamFile {
public class AccountBalanceFile implements StreamFile<AccountBalance> {

@ToString.Exclude
private byte[] bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Data
@Entity
@NoArgsConstructor
public class EventFile implements StreamFile {
public class EventFile implements StreamFile<EventItem> {

@ToString.Exclude
private byte[] bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@Entity
@AllArgsConstructor
@NoArgsConstructor
public class RecordFile implements StreamFile {
public class RecordFile implements StreamFile<RecordItem> {

@ToString.Exclude
private byte[] bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import com.hedera.mirror.importer.parser.domain.StreamItem;

public interface StreamFile {
public interface StreamFile<T extends StreamItem> {

byte[] getBytes();

Expand All @@ -46,7 +46,7 @@ default Long getIndex() {
return null;
}

<T extends StreamItem> List<T> getItems();
List<T> getItems();

Long getLoadEnd();

Expand All @@ -70,6 +70,10 @@ default String getPreviousHash() {
default void setHash(String hash) {
}

void setBytes(byte[] bytes);

void setItems(List<T> items);

void setName(String name);

default void setIndex(Long index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,6 @@ private PendingDownload pendingDownload(StreamFilename streamFilename, String s3
return new PendingDownload(future, streamFilename, s3Key);
}

private void moveSignatureFile(FileStreamSignature signature) {
if (downloaderProperties.isKeepSignatures()) {
Path destination = downloaderProperties.getSignaturesPath().resolve(signature.getNodeAccountIdString());
Utility.archiveFile(signature.getFilename(), signature.getBytes(), destination);
}
}

/**
* For each group of signature files with the same file name: (1) verify that the signature files are signed by
* corresponding node's PublicKey; (2) For valid signature files, we compare their Hashes to see if at least 1/3 of
Expand Down Expand Up @@ -416,13 +409,28 @@ private void verifySigsAndDownloadDataFiles(Multimap<String, FileStreamSignature

verify(streamFile, signature);

if (downloaderProperties.isKeepFiles()) {
Utility.archiveFile(streamFile.getName(), streamFile.getBytes(),
downloaderProperties.getNodeStreamPath(signature.getNodeAccountIdString()));
}

if (downloaderProperties.isKeepSignatures()) {
signatures.forEach(s -> {
Path destination = downloaderProperties.getNodeStreamPath(s.getNodeAccountIdString());
Utility.archiveFile(s.getFilename(), s.getBytes(), destination);
});
}

if (!downloaderProperties.isPersistBytes()) {
streamFile.setBytes(null);
}

if (dataFilename.getInstant().isAfter(endDate)) {
downloaderProperties.setEnabled(false);
log.warn("Disabled polling after downloading all files <= endDate ({})", endDate);
return;
}

signatures.forEach(this::moveSignatureFile);
onVerified(pendingDownload, streamFile);
valid = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,37 @@ public interface DownloaderProperties {

MirrorProperties getMirrorProperties();

String getPrefix();

default Path getNodeStreamPath(String nodeAccountId) {
return getStreamPath().resolve(getStreamType().getNodePrefix() + nodeAccountId);
}

Path getStreamPath();

StreamType getStreamType();

/**
* The number of current mainnet nodes used to download signatures in parallel. Should be adjusted when nodes
* change
*/
int getThreads();

String getPrefix();

Path getStreamPath();
boolean isEnabled();

StreamType getStreamType();
boolean isKeepFiles();

default Path getSignaturesPath() {
return getStreamPath().resolve(getStreamType().getSignatures());
}
boolean isKeepSignatures();

boolean isEnabled();
boolean isPersistBytes();

void setBatchSize(int batchSize);

void setEnabled(boolean enabled);

boolean isKeepSignatures();
void setKeepFiles(boolean keepFiles);

void setKeepSignatures(boolean keepSignatures);

void setPersistBytes(boolean keepBytes);
}
Loading