From b547197512196136e10406053e20a2f60be568c9 Mon Sep 17 00:00:00 2001 From: Nana-EC <56320167+Nana-EC@users.noreply.github.com> Date: Mon, 21 Jun 2021 12:29:11 -0500 Subject: [PATCH] Cherry pick Add multi batch support to upsert pgcopy flow (#2155) When batch inserts occur within the same record file the Upsert flow attempt to recreate the temporary table that already exists - Update Create table query to with `if not exists` - Add a `truncate table` regardless to ensure insert and update don't utilize previously inserted information Cherry pick of [PR 2153](https://github.com/hashgraph/hedera-mirror-node/pull/2153) Signed-off-by: Nana-EC --- .../mirror/importer/parser/UpsertPgCopy.java | 8 ++++ .../upsert/AbstractUpsertQueryGenerator.java | 2 +- .../record/entity/sql/UpsertPgCopyTest.java | 37 ++++++++++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/UpsertPgCopy.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/UpsertPgCopy.java index 9996e878fa6..a5d7b4fc895 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/UpsertPgCopy.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/UpsertPgCopy.java @@ -49,11 +49,14 @@ public class UpsertPgCopy extends PgCopy { private final Timer copyDurationMetric; private final Timer finalInsertDurationMetric; private final Timer updateDurationMetric; + private final String truncateSql; public UpsertPgCopy(Class entityClass, MeterRegistry meterRegistry, ParserProperties properties, UpsertQueryGenerator upsertQueryGenerator) { super(entityClass, meterRegistry, properties, upsertQueryGenerator.getTemporaryTableName()); createTempTableSql = upsertQueryGenerator.getCreateTempTableQuery(); + truncateSql = String + .format("truncate table %s restart identity cascade", upsertQueryGenerator.getTemporaryTableName()); finalTableName = upsertQueryGenerator.getFinalTableName(); insertSql = upsertQueryGenerator.getInsertQuery(); updateSql = upsertQueryGenerator.getUpdateQuery(); @@ -118,6 +121,11 @@ private void createTempTable(Connection connection) throws SQLException { try (PreparedStatement preparedStatement = connection.prepareStatement(createTempTableSql)) { preparedStatement.executeUpdate(); } + + // ensure table is empty in case of batching + try (PreparedStatement preparedStatement = connection.prepareStatement(truncateSql)) { + preparedStatement.executeUpdate(); + } log.trace("Created temp table {}", tableName); } diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/repository/upsert/AbstractUpsertQueryGenerator.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/repository/upsert/AbstractUpsertQueryGenerator.java index 217e3b240a5..408ec1ede4f 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/repository/upsert/AbstractUpsertQueryGenerator.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/repository/upsert/AbstractUpsertQueryGenerator.java @@ -79,7 +79,7 @@ public abstract class AbstractUpsertQueryGenerator implements UpsertQueryGene @Override public String getCreateTempTableQuery() { - return String.format("create temporary table %s on commit drop as table %s limit 0", + return String.format("create temporary table if not exists %s on commit drop as table %s limit 0", getTemporaryTableName(), getFinalTableName()); } diff --git a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/sql/UpsertPgCopyTest.java b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/sql/UpsertPgCopyTest.java index 9dd0d29147a..f9c1772acc2 100644 --- a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/sql/UpsertPgCopyTest.java +++ b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/sql/UpsertPgCopyTest.java @@ -154,6 +154,37 @@ void entityInsertAndUpdate() throws SQLException { .containsExactlyInAnyOrder("memo-1", "memo-2", "", "updated-memo-4", "memo-5", "memo-6"); } + @Test + void entityInsertAndUpdateBatched() throws SQLException { + var entities = new HashSet(); + long consensusTimestamp = 1; + entities.add(getEntity(1, consensusTimestamp, consensusTimestamp, "memo-1")); + entities.add(getEntity(2, consensusTimestamp, consensusTimestamp, "memo-2")); + entities.add(getEntity(3, consensusTimestamp, consensusTimestamp, "memo-3")); + entities.add(getEntity(4, consensusTimestamp, consensusTimestamp, "memo-4")); + + // update + long updateTimestamp = 5; + + // updated + var updateEntities = new HashSet(); + updateEntities.add(getEntity(3, null, updateTimestamp, "")); + updateEntities.add(getEntity(4, null, updateTimestamp, "updated-memo-4")); + + // new inserts + updateEntities.add(getEntity(5, null, updateTimestamp, "memo-5")); + updateEntities.add(getEntity(6, null, updateTimestamp, "memo-6")); + + copyWithTransactionSupport(entityPgCopy, entities, updateEntities); // copy inserts and updates + + assertThat(entityRepository + .findAll()) + .isNotEmpty() + .hasSize(6) + .extracting(Entity::getMemo) + .containsExactlyInAnyOrder("memo-1", "memo-2", "", "updated-memo-4", "memo-5", "memo-6"); + } + @Test void tokenInsertOnly() throws SQLException, DecoderException { var tokens = new HashSet(); @@ -409,10 +440,12 @@ void scheduleInsertAndUpdate() throws SQLException { .containsExactlyInAnyOrder(null, null, 5L, 6L, null, null); } - private void copyWithTransactionSupport(UpsertPgCopy upsertPgCopy, Collection items) throws SQLException { + private void copyWithTransactionSupport(UpsertPgCopy upsertPgCopy, Collection... items) throws SQLException { try (Connection connection = DataSourceUtils.getConnection(dataSource)) { connection.setAutoCommit(false); // for tests have to set auto commit to false or temp table gets lost - upsertPgCopy.copy(items, connection); + for (Collection batch : items) { + upsertPgCopy.copy(batch, connection); + } connection.commit(); } finally {