Skip to content

Commit

Permalink
Cherry pick Add multi batch support to upsert pgcopy flow (#2155)
Browse files Browse the repository at this point in the history
When batch inserts occur within the same record file the Upsert flow attempt to recreate the temporary table that already exists

- Update Create table query to with `if not exists`
- Add a `truncate table` regardless to ensure insert and update don't utilize previously inserted information

Cherry pick of [PR 2153](#2153)

Signed-off-by: Nana-EC <nana.essilfie-conduah@hedera.com>
  • Loading branch information
Nana-EC authored Jun 21, 2021
1 parent 72fad5d commit b547197
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ public class UpsertPgCopy<T> extends PgCopy<T> {
private final Timer copyDurationMetric;
private final Timer finalInsertDurationMetric;
private final Timer updateDurationMetric;
private final String truncateSql;

public UpsertPgCopy(Class<T> entityClass, MeterRegistry meterRegistry, ParserProperties properties,
UpsertQueryGenerator upsertQueryGenerator) {
super(entityClass, meterRegistry, properties, upsertQueryGenerator.getTemporaryTableName());
createTempTableSql = upsertQueryGenerator.getCreateTempTableQuery();
truncateSql = String
.format("truncate table %s restart identity cascade", upsertQueryGenerator.getTemporaryTableName());
finalTableName = upsertQueryGenerator.getFinalTableName();
insertSql = upsertQueryGenerator.getInsertQuery();
updateSql = upsertQueryGenerator.getUpdateQuery();
Expand Down Expand Up @@ -118,6 +121,11 @@ private void createTempTable(Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(createTempTableSql)) {
preparedStatement.executeUpdate();
}

// ensure table is empty in case of batching
try (PreparedStatement preparedStatement = connection.prepareStatement(truncateSql)) {
preparedStatement.executeUpdate();
}
log.trace("Created temp table {}", tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public abstract class AbstractUpsertQueryGenerator<T> implements UpsertQueryGene

@Override
public String getCreateTempTableQuery() {
return String.format("create temporary table %s on commit drop as table %s limit 0",
return String.format("create temporary table if not exists %s on commit drop as table %s limit 0",
getTemporaryTableName(), getFinalTableName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ void entityInsertAndUpdate() throws SQLException {
.containsExactlyInAnyOrder("memo-1", "memo-2", "", "updated-memo-4", "memo-5", "memo-6");
}

@Test
void entityInsertAndUpdateBatched() throws SQLException {
var entities = new HashSet<Entity>();
long consensusTimestamp = 1;
entities.add(getEntity(1, consensusTimestamp, consensusTimestamp, "memo-1"));
entities.add(getEntity(2, consensusTimestamp, consensusTimestamp, "memo-2"));
entities.add(getEntity(3, consensusTimestamp, consensusTimestamp, "memo-3"));
entities.add(getEntity(4, consensusTimestamp, consensusTimestamp, "memo-4"));

// update
long updateTimestamp = 5;

// updated
var updateEntities = new HashSet<Entity>();
updateEntities.add(getEntity(3, null, updateTimestamp, ""));
updateEntities.add(getEntity(4, null, updateTimestamp, "updated-memo-4"));

// new inserts
updateEntities.add(getEntity(5, null, updateTimestamp, "memo-5"));
updateEntities.add(getEntity(6, null, updateTimestamp, "memo-6"));

copyWithTransactionSupport(entityPgCopy, entities, updateEntities); // copy inserts and updates

assertThat(entityRepository
.findAll())
.isNotEmpty()
.hasSize(6)
.extracting(Entity::getMemo)
.containsExactlyInAnyOrder("memo-1", "memo-2", "", "updated-memo-4", "memo-5", "memo-6");
}

@Test
void tokenInsertOnly() throws SQLException, DecoderException {
var tokens = new HashSet<Token>();
Expand Down Expand Up @@ -409,10 +440,12 @@ void scheduleInsertAndUpdate() throws SQLException {
.containsExactlyInAnyOrder(null, null, 5L, 6L, null, null);
}

private void copyWithTransactionSupport(UpsertPgCopy upsertPgCopy, Collection items) throws SQLException {
private void copyWithTransactionSupport(UpsertPgCopy upsertPgCopy, Collection... items) throws SQLException {
try (Connection connection = DataSourceUtils.getConnection(dataSource)) {
connection.setAutoCommit(false); // for tests have to set auto commit to false or temp table gets lost
upsertPgCopy.copy(items, connection);
for (Collection batch : items) {
upsertPgCopy.copy(batch, connection);
}
connection.commit();
} finally {

Expand Down

0 comments on commit b547197

Please sign in to comment.