Skip to content

Commit

Permalink
Add Upsert support to NFTs (#2218)
Browse files Browse the repository at this point in the history
* Add NftUpsertQueryGenerator logic and tests

The importer is currently using old logic for handling NFTs and NFT transfers, as opposed to the new upsert logic added.
This updates `NftTransfers` to use insert only logic and `Nft`s to use upsert

- Update `nft.deleted` and `nft.metadata` columns and fields to be nullable to support upsert logic
- Replace nft repositories with `onNft()` in `EntityRecordItemListener`
- Replace nftTransfer repositories with `onNftTransfer()` in `EntityRecordItemListener`
- Add custom update method support to `AbstractUpsertQueryGenerator`
- Add `NftUpsertQueryGenerator` to support upsert logic
- Update `SqlEntityListener` to use `PgCopy` with NftTransfers and `UpsertPgCopy` for Nft
- Added tests for various nft upsert flows

Signed-off-by: Nana-EC <nana.essilfie-conduah@hedera.com>
Signed-off-by: Ian Jungmann <ian.jungmann@hedera.com>
  • Loading branch information
Nana-EC authored and Ian Jungmann committed Jul 16, 2021
1 parent ef21161 commit dee2ab0
Show file tree
Hide file tree
Showing 16 changed files with 697 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import lombok.Data;
import lombok.NoArgsConstructor;

import com.hedera.mirror.importer.converter.AccountIdConverter;
import com.hedera.mirror.importer.converter.EntityIdSerializer;

@Data
@Entity
@NoArgsConstructor
public class Nft {

@JsonUnwrapped
Expand All @@ -44,9 +46,13 @@ public class Nft {

private Long createdTimestamp;

private boolean deleted;
private Boolean deleted;

private byte[] metadata;

private long modifiedTimestamp;

public Nft(long serialNumber, EntityId tokenId) {
id = new NftId(serialNumber, tokenId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void persistItems(Collection<T> items, Connection connection) {
}
}

public int insertToFinalTable(Connection connection) throws SQLException {
private int insertToFinalTable(Connection connection) throws SQLException {
int insertCount = 0;
try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) {
insertCount = preparedStatement.executeUpdate();
Expand All @@ -109,7 +109,7 @@ public int insertToFinalTable(Connection connection) throws SQLException {
return insertCount;
}

public void updateFinalTable(Connection connection) throws SQLException {
private void updateFinalTable(Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) {
preparedStatement.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import com.hederahashgraph.api.proto.java.TransactionID;
import com.hederahashgraph.api.proto.java.TransactionRecord;
import com.hederahashgraph.api.proto.java.TransferList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
Expand All @@ -69,7 +68,6 @@
import com.hedera.mirror.importer.domain.FileData;
import com.hedera.mirror.importer.domain.LiveHash;
import com.hedera.mirror.importer.domain.Nft;
import com.hedera.mirror.importer.domain.NftId;
import com.hedera.mirror.importer.domain.NftTransfer;
import com.hedera.mirror.importer.domain.NftTransferId;
import com.hedera.mirror.importer.domain.NonFeeTransfer;
Expand All @@ -96,7 +94,6 @@
import com.hedera.mirror.importer.parser.record.RecordItemListener;
import com.hedera.mirror.importer.parser.record.transactionhandler.TransactionHandler;
import com.hedera.mirror.importer.parser.record.transactionhandler.TransactionHandlerFactory;
import com.hedera.mirror.importer.repository.NftRepository;
import com.hedera.mirror.importer.util.Utility;

@Log4j2
Expand All @@ -109,20 +106,17 @@ public class EntityRecordItemListener implements RecordItemListener {
private final EntityListener entityListener;
private final TransactionHandlerFactory transactionHandlerFactory;
private final Predicate<TransactionFilterFields> transactionFilter;
private final NftRepository nftRepository;

public EntityRecordItemListener(CommonParserProperties commonParserProperties, EntityProperties entityProperties,
AddressBookService addressBookService,
NonFeeTransferExtractionStrategy nonFeeTransfersExtractor,
EntityListener entityListener,
TransactionHandlerFactory transactionHandlerFactory,
NftRepository nftRepository) {
TransactionHandlerFactory transactionHandlerFactory) {
this.entityProperties = entityProperties;
this.addressBookService = addressBookService;
this.nonFeeTransfersExtractor = nonFeeTransfersExtractor;
this.entityListener = entityListener;
this.transactionHandlerFactory = transactionHandlerFactory;
this.nftRepository = nftRepository;
transactionFilter = commonParserProperties.getFilter();
}

Expand Down Expand Up @@ -490,7 +484,7 @@ private void insertTokenBurn(RecordItem recordItem) {
consensusTimestamp);

tokenBurnTransactionBody.getSerialNumbersList().forEach(serialNumber ->
nftRepository.burnOrWipeNft(new NftId(serialNumber, tokenId), consensusTimestamp)
updateNftDeleteStatus(consensusTimestamp, serialNumber, tokenId)
);
}
}
Expand Down Expand Up @@ -609,16 +603,14 @@ private void insertTokenMint(RecordItem recordItem) {
consensusTimestamp);

List<Long> serialNumbers = recordItem.getRecord().getReceipt().getSerialNumbersList();
List<Nft> nfts = new ArrayList<>();
for (int i = 0; i < serialNumbers.size(); i++) {
Nft nft = new Nft();
Nft nft = new Nft(serialNumbers.get(i), tokenId);
nft.setCreatedTimestamp(consensusTimestamp);
nft.setId(new NftId(serialNumbers.get(i), tokenId));
nft.setDeleted(false);
nft.setMetadata(tokenMintTransactionBody.getMetadata(i).toByteArray());
nft.setModifiedTimestamp(consensusTimestamp);
nfts.add(nft);
entityListener.onNft(nft);
}
nftRepository.saveAll(nfts);
}
}

Expand Down Expand Up @@ -668,14 +660,21 @@ private void insertTokenTransfers(RecordItem recordItem) {

entityListener.onNftTransfer(nftTransferDomain);
if (!EntityId.isEmpty(receiverId)) {
nftRepository.transferNftOwnership(new NftId(serialNumber, tokenId), receiverId,
consensusTimestamp);
transferNftOwnership(consensusTimestamp, serialNumber, tokenId, receiverId);
}
});
});
}
}

private void transferNftOwnership(long modifiedTimeStamp, long serialNumber, EntityId tokenId,
EntityId receiverId) {
Nft nft = new Nft(serialNumber, tokenId);
nft.setAccountId(receiverId);
nft.setModifiedTimestamp(modifiedTimeStamp);
entityListener.onNft(nft);
}

private void insertTokenUpdate(RecordItem recordItem) {
if (entityProperties.getPersist().isTokens()) {
long consensusTimestamp = recordItem.getConsensusTimestamp();
Expand Down Expand Up @@ -750,7 +749,7 @@ private void insertTokenAccountWipe(RecordItem recordItem) {
consensusTimestamp);

tokenWipeAccountTransactionBody.getSerialNumbersList().forEach(serialNumber ->
nftRepository.burnOrWipeNft(new NftId(serialNumber, tokenId), consensusTimestamp));
updateNftDeleteStatus(consensusTimestamp, serialNumber, tokenId));
}
}

Expand All @@ -770,6 +769,13 @@ private void updateToken(Token token, long modifiedTimestamp) {
entityListener.onToken(token);
}

private void updateNftDeleteStatus(long modifiedTimeStamp, long serialNumber, EntityId tokenId) {
Nft nft = new Nft(serialNumber, tokenId);
nft.setDeleted(true);
nft.setModifiedTimestamp(modifiedTimeStamp);
entityListener.onNft(nft);
}

private void updateTokenSupply(EntityId tokenId, long newTotalSupply, long modifiedTimestamp) {
Token token = Token.of(tokenId);
token.setTotalSupply(newTotalSupply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.hedera.mirror.importer.domain.FileData;
import com.hedera.mirror.importer.domain.LiveHash;
import com.hedera.mirror.importer.domain.Nft;
import com.hedera.mirror.importer.domain.NftId;
import com.hedera.mirror.importer.domain.NftTransfer;
import com.hedera.mirror.importer.domain.NonFeeTransfer;
import com.hedera.mirror.importer.domain.RecordFile;
Expand All @@ -65,9 +66,9 @@
import com.hedera.mirror.importer.parser.record.entity.EntityBatchCleanupEvent;
import com.hedera.mirror.importer.parser.record.entity.EntityBatchSaveEvent;
import com.hedera.mirror.importer.parser.record.entity.EntityListener;
import com.hedera.mirror.importer.repository.NftRepository;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import com.hedera.mirror.importer.repository.upsert.EntityUpsertQueryGenerator;
import com.hedera.mirror.importer.repository.upsert.NftUpsertQueryGenerator;
import com.hedera.mirror.importer.repository.upsert.ScheduleUpsertQueryGenerator;
import com.hedera.mirror.importer.repository.upsert.TokenAccountUpsertQueryGenerator;
import com.hedera.mirror.importer.repository.upsert.TokenUpsertQueryGenerator;
Expand All @@ -82,7 +83,6 @@ public class SqlEntityListener implements EntityListener, RecordStreamFileListen
private final RecordFileRepository recordFileRepository;
private final SqlProperties sqlProperties;
private final ApplicationEventPublisher eventPublisher;
private final NftRepository nftRepository;

// init schemas, writers, etc once per process
private final PgCopy<AssessedCustomFee> assessedCustomFeePgCopy;
Expand All @@ -99,6 +99,7 @@ public class SqlEntityListener implements EntityListener, RecordStreamFileListen
private final PgCopy<TransactionSignature> transactionSignaturePgCopy;

private final UpsertPgCopy<Entity> entityPgCopy;
private final UpsertPgCopy<Nft> nftPgCopy;
private final UpsertPgCopy<Schedule> schedulePgCopy;
private final UpsertPgCopy<TokenAccount> tokenAccountPgCopy;
private final UpsertPgCopy<Token> tokenPgCopy;
Expand All @@ -122,6 +123,7 @@ public class SqlEntityListener implements EntityListener, RecordStreamFileListen
private final Map<Long, Schedule> schedules;
private final Map<Long, Token> tokens;
private final Map<TokenAccountId, TokenAccount> tokenAccounts;
private final Map<NftId, Nft> nfts;

public SqlEntityListener(RecordParserProperties recordParserProperties, SqlProperties sqlProperties,
DataSource dataSource,
Expand All @@ -130,12 +132,12 @@ public SqlEntityListener(RecordParserProperties recordParserProperties, SqlPrope
EntityUpsertQueryGenerator entityUpsertQueryGenerator,
ScheduleUpsertQueryGenerator scheduleUpsertQueryGenerator,
TokenUpsertQueryGenerator tokenUpsertQueryGenerator,
TokenAccountUpsertQueryGenerator tokenAccountUpsertQueryGenerator, NftRepository nftRepository) {
TokenAccountUpsertQueryGenerator tokenAccountUpsertQueryGenerator,
NftUpsertQueryGenerator nftUpsertQueryGenerator) {
this.dataSource = dataSource;
this.recordFileRepository = recordFileRepository;
this.sqlProperties = sqlProperties;
this.eventPublisher = eventPublisher;
this.nftRepository = nftRepository;

// insert only tables
assessedCustomFeePgCopy = new PgCopy<>(AssessedCustomFee.class, meterRegistry, recordParserProperties);
Expand All @@ -154,6 +156,8 @@ public SqlEntityListener(RecordParserProperties recordParserProperties, SqlPrope
// updatable tables
entityPgCopy = new UpsertPgCopy<>(Entity.class, meterRegistry, recordParserProperties,
entityUpsertQueryGenerator);
nftPgCopy = new UpsertPgCopy<>(Nft.class, meterRegistry, recordParserProperties,
nftUpsertQueryGenerator);
schedulePgCopy = new UpsertPgCopy<>(Schedule.class, meterRegistry, recordParserProperties,
scheduleUpsertQueryGenerator);
tokenAccountPgCopy = new UpsertPgCopy<>(TokenAccount.class, meterRegistry, recordParserProperties,
Expand All @@ -175,6 +179,7 @@ public SqlEntityListener(RecordParserProperties recordParserProperties, SqlPrope
transactionSignatures = new ArrayList<>();

entities = new HashMap<>();
nfts = new HashMap<>();
schedules = new HashMap<>();
tokens = new HashMap<>();
tokenAccounts = new HashMap<>();
Expand Down Expand Up @@ -211,6 +216,7 @@ private void cleanup() {
fileData.clear();
liveHashes.clear();
nonFeeTransfers.clear();
nfts.clear();
nftTransfers.clear();
schedules.clear();
topicMessages.clear();
Expand Down Expand Up @@ -242,18 +248,22 @@ private void executeBatches() {
customFeePgCopy.copy(customFees, connection);
fileDataPgCopy.copy(fileData, connection);
liveHashPgCopy.copy(liveHashes, connection);
nonFeeTransferPgCopy.copy(nonFeeTransfers, connection);
nftTransferPgCopy.copy(nftTransfers, connection);
tokenTransferPgCopy.copy(tokenTransfers, connection);
topicMessagePgCopy.copy(topicMessages, connection);
transactionPgCopy.copy(transactions, connection);
transactionSignaturePgCopy.copy(transactionSignatures, connection);

// insert operations with conflict management for updates
// insert operations with conflict management
entityPgCopy.copy(entities.values(), connection);
schedulePgCopy.copy(schedules.values(), connection);
tokenPgCopy.copy(tokens.values(), connection);
tokenAccountPgCopy.copy(tokenAccounts.values(), connection);
nftPgCopy.copy(nfts.values(), connection); // persist nft after token entity
schedulePgCopy.copy(schedules.values(), connection);

// transfers operations should be last to ensure insert logic completeness, entities should already exist
nonFeeTransferPgCopy.copy(nonFeeTransfers, connection);
nftTransferPgCopy.copy(nftTransfers, connection);
tokenTransferPgCopy.copy(tokenTransfers, connection);

log.info("Completed batch inserts in {}", stopwatch);
} catch (ParserException e) {
throw e;
Expand Down Expand Up @@ -308,7 +318,7 @@ public void onLiveHash(LiveHash liveHash) throws ImporterException {

@Override
public void onNft(Nft nft) throws ImporterException {
nftRepository.save(nft);
nfts.merge(nft.getId(), nft, this::mergeNft);
}

@Override
Expand Down Expand Up @@ -445,6 +455,27 @@ private Token mergeToken(Token cachedToken, Token newToken) {
return cachedToken;
}

private Nft mergeNft(Nft cachedNft, Nft newNft) {
if (cachedNft.getCreatedTimestamp() == null && newNft.getCreatedTimestamp() != null) {
cachedNft.setCreatedTimestamp(newNft.getCreatedTimestamp());
}

if (newNft.getMetadata() == null) { // only domains generated by NftTransfers should set account
cachedNft.setAccountId(newNft.getAccountId() == null ? EntityId.EMPTY : newNft.getAccountId());
}

if (newNft.getDeleted() != null) {
cachedNft.setDeleted(newNft.getDeleted());
}

if (newNft.getMetadata() != null) {
cachedNft.setMetadata(newNft.getMetadata());
}

cachedNft.setModifiedTimestamp(newNft.getModifiedTimestamp());
return cachedNft;
}

private TokenAccount mergeTokenAccount(TokenAccount cachedTokenAccount, TokenAccount newTokenAccount) {
if (newTokenAccount.getAssociated() != null) {
cachedTokenAccount.setAssociated(newTokenAccount.getAssociated());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ protected String getAttributeSelectQuery(String attributeName) {
return null;
}

protected String getAttributeUpdateQuery(String attributeName) {
return null;
}

protected List<String> getV1ConflictIdColumns() {
return Collections.emptyList();
}
Expand Down Expand Up @@ -179,7 +183,7 @@ protected String getFormattedColumnName(String camelCaseName) {
camelCaseName);
}

private String getSelectCoalesceQuery(String column, String defaultValue) {
protected String getSelectCoalesceQuery(String column, String defaultValue) {
// e.g. "coalesce(delete, 'false')"
String formattedColumnName = getFullTempTableColumnName(column);
return String.format("coalesce(%s, %s)", formattedColumnName, defaultValue);
Expand All @@ -205,7 +209,7 @@ private String getUpdateCoalesceAssign(String column) {
}

private String getUpdateNullableStringCaseCoalesceAssign(String column) {
// e.g. "case when entity_temp.memo = ' ' then '' else coalesce(entity_temp.memo, entity.memo) end"
// e.g. "case when entity_temp.memo = <uuid> then '' else coalesce(entity_temp.memo, entity.memo) end"
String finalFormattedColumnName = getFullFinalTableColumnName(column);
String tempFormattedColumnName = getFullTempTableColumnName(column);
return String.format(
Expand Down Expand Up @@ -313,7 +317,11 @@ private String getUpdateClause() {
Collections.sort(updatableAttributes, DOMAIN_FIELD_COMPARATOR); // sort fields alphabetically
updatableAttributes.forEach(d -> {
String attributeUpdateQuery = "";
if (d.getType() == String.class) {
// get column custom update implementations
String columnSelectQuery = getAttributeUpdateQuery(d.getName());
if (!StringUtils.isEmpty(columnSelectQuery)) {
attributeUpdateQuery = columnSelectQuery;
} else if (d.getType() == String.class) {
attributeUpdateQuery = getUpdateNullableStringCaseCoalesceAssign(d.getName());
} else {
attributeUpdateQuery = getUpdateCoalesceAssign(d.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
@Named
@Value
public class EntityUpsertQueryGenerator extends AbstractUpsertQueryGenerator<Entity_> {
public final String finalTableName = "entity";
public final String temporaryTableName = getFinalTableName() + "_temp";
private final String finalTableName = "entity";
private final String temporaryTableName = getFinalTableName() + "_temp";
private final List<String> v1ConflictIdColumns = List.of(Entity_.ID);
private final List<String> v2ConflictIdColumns = List.of(Entity_.ID);
private final Set<String> nullableColumns = Set.of(Entity_.AUTO_RENEW_ACCOUNT_ID,
Expand Down
Loading

0 comments on commit dee2ab0

Please sign in to comment.