Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Progress Bar Persistence Read/Write #20787

Merged
merged 21 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0be298c
Checkpoint: Add interface methods.
davinchia Dec 20, 2022
d6e12a7
Correct the attempt id column to be a bigint.
davinchia Dec 24, 2022
326c555
Correct the wrong constraints.
davinchia Dec 24, 2022
402913c
Start adding tests. Alphabetise the SyncStats.yaml. Add get StreamSta…
davinchia Dec 24, 2022
513c068
Implement the right migration.
davinchia Dec 24, 2022
5bf2b49
Add estimated fieldds to the SyncStats message.
davinchia Dec 24, 2022
6d3661f
Change interface to a combined stats POJO object. Modify query to sor…
davinchia Dec 24, 2022
6699e03
Merge remote-tracking branch 'origin/master' into davinchia/implement…
davinchia Dec 27, 2022
ed35076
Get the basic test passing.
davinchia Dec 27, 2022
055a599
Add upsert handling for sync stats table.
davinchia Dec 27, 2022
3d3a5bc
Edit migration to allow for namespace to be null. Implement writing t…
davinchia Dec 27, 2022
4c6701a
Add upsert handling for stream stats table. Buff up test suite.
davinchia Dec 27, 2022
1059b86
Checkpoint: Everything works except upsert for a stream with null nam…
davinchia Dec 27, 2022
8d304f4
Get everything passing.
davinchia Dec 27, 2022
efca05e
Clean up.
davinchia Dec 27, 2022
5500bc2
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
6b7eddd
Remove bad annotation.
davinchia Dec 27, 2022
56436ff
Add field for namespace.
davinchia Dec 27, 2022
44f6bb6
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
c25d472
Respond to PR feedback.
davinchia Dec 28, 2022
b3a1027
Respond to PR feedback.
davinchia Dec 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add upsert handling for stream stats table. Buff up test suite.
  • Loading branch information
davinchia committed Dec 27, 2022
commit 4c6701adf9e16f58253520e2f3e270479d89a1d2
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ static void saveToSyncStatsTable(final OffsetDateTime now, final SyncStats syncS
}

// Although JOOQ supports upsert using the onConflict statement, we cannot use it as the table
// currently has duplicate records
// and also doesn't contain the unique constraint on the attempt_id column JOOQ requires.
// currently has duplicate records and also doesn't contain the unique constraint on the attempt_id
// column JOOQ requires.
ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.CREATED_AT, now)
Expand All @@ -442,23 +442,9 @@ static void saveToSyncStatsTable(final OffsetDateTime now, final SyncStats syncS
}

private static void saveToStreamStatsTable(final OffsetDateTime now,
List<StreamSyncStats> streamStats,
final List<StreamSyncStats> streamStats,
final Long attemptId,
final DSLContext ctx) {
final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId));
// if (isExisting) {
// what else do we need to update?
// ctx.update(STREAM_STATS)
// .set(STREAM_STATS.BYTES_EMITTED, bytesEmitted)
// .set(STREAM_STATS.RECORDS_EMITTED, recordsEmitted)
// .set(STREAM_STATS.ESTIMATED_BYTES, estimatedBytes)
// .set(STREAM_STATS.ESTIMATED_RECORDS, estimatedRecords)
// .set(STREAM_STATS.UPDATED_AT, now)
// .where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
// .execute();
// return;
// }

Optional.ofNullable(streamStats).orElse(Collections.emptyList()).forEach(
stats -> ctx.insertInto(STREAM_STATS)
.set(STREAM_STATS.ID, UUID.randomUUID())
Expand All @@ -471,8 +457,7 @@ private static void saveToStreamStatsTable(final OffsetDateTime now,
.set(STREAM_STATS.RECORDS_EMITTED, stats.getStats().getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getStats().getEstimatedBytes())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getStats().getEstimatedRecords())
.onConflict(
STREAM_STATS.ATTEMPT_ID, STREAM_STATS.STREAM_NAMESPACE, STREAM_STATS.STREAM_NAME)
.onConflict(STREAM_STATS.ATTEMPT_ID, STREAM_STATS.STREAM_NAMESPACE, STREAM_STATS.STREAM_NAME)
.doUpdate()
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getStats().getBytesEmitted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.AIRBYTE_METADATA;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -340,7 +341,7 @@ void testWriteStatsFirst() throws IOException {
.withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats);

AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber);
final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber);
final var combined = stats.combinedStats();
assertEquals(1000, combined.getBytesEmitted());
assertEquals(1000, combined.getRecordsEmitted());
Expand All @@ -357,73 +358,74 @@ void testWriteStatsFirst() throws IOException {
}

@Test
@DisplayName("Writing stats should update the previous record")
void testWriteStatsUpsert() throws IOException, SQLException, InterruptedException {
@DisplayName("Writing stats multiple times should write record and bytes information correctly without exceptions")
void testWriteSyncStatsRepeated() throws IOException, SQLException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);

// First write.
var streamStats = List.of(
new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns")
.withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats);

// Second write.
when(timeSupplier.get()).thenReturn(Instant.now());
jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, null);
streamStats = List.of(
new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns")
.withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, streamStats);

final var stats = jobPersistence.getAttemptStats(jobId, attemptNumber).combinedStats();
assertEquals(2000, stats.getBytesEmitted());
assertEquals(2000, stats.getRecordsEmitted());
assertEquals(2000, stats.getEstimatedBytes());
assertEquals(2000, stats.getEstimatedRecords());
final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber);
final var combined = stats.combinedStats();
assertEquals(2000, combined.getBytesEmitted());
assertEquals(2000, combined.getRecordsEmitted());
assertEquals(2000, combined.getEstimatedBytes());
assertEquals(2000, combined.getEstimatedRecords());

final var record = jobDatabase.query(ctx -> {
final var attemptId = DefaultJobPersistence.getAttemptId(jobId, attemptNumber, ctx);
return ctx.fetch("SELECT * from sync_stats where attempt_id = ?", attemptId).stream().findFirst().get();
});
final var actStreamStats = stats.perStreamStats();
assertEquals(1, actStreamStats.size());
assertEquals(streamStats, actStreamStats);

// Check time stamps to confirm.
assertNotEquals(record.get(SYNC_STATS.CREATED_AT), record.get(SYNC_STATS.UPDATED_AT));
}

// @Test
// @DisplayName("Writing stats multiple times should write record and bytes information correctly
// without exceptions")
// void testWriteSyncStatsRepeated() throws IOException, SQLException {
// final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
// final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);
//
// final Optional<Record> record =
// jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number =
// ?", jobId,
// attemptNumber).stream().findFirst());
// final Long attemptId = record.get().get("id", Long.class);
//
// var stat = jobPersistence.getSyncStats(attemptId).stream().findFirst().get();
// assertEquals(1000, stat.getBytesEmitted());
// assertEquals(1000, stat.getRecordsEmitted());
// assertEquals(1000, stat.getEstimatedBytes());
// assertEquals(1000, stat.getEstimatedRecords());
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, null);
// var stats = jobPersistence.getSyncStats(attemptId);
// assertEquals(1, stats.size());
//
// stat = stats.stream().findFirst().get();
// assertEquals(2000, stat.getBytesEmitted());
// assertEquals(2000, stat.getRecordsEmitted());
// assertEquals(2000, stat.getEstimatedBytes());
// assertEquals(2000, stat.getEstimatedRecords());
//
// }
}

}
@Test
@DisplayName("Writing multiple stats of the same attempt id, stream name and namespace should update the previous record")
void testWriteStatsUpsert() throws IOException, SQLException, InterruptedException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);

@Nested
class UpsertUpdate {
// First write.
var streamStats = List.of(
new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns")
.withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats);

@Test
void testWriteToSyncStats() throws IOException {
// Second write.
when(timeSupplier.get()).thenReturn(Instant.now());
streamStats = List.of(
new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns")
.withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, streamStats);

final var syncStatsRec = jobDatabase.query(ctx -> {
final var attemptId = DefaultJobPersistence.getAttemptId(jobId, attemptNumber, ctx);
return ctx.fetch("SELECT * from sync_stats where attempt_id = ?", attemptId).stream().findFirst().get();
});

// Check time stamps to confirm upsert.
assertNotEquals(syncStatsRec.get(SYNC_STATS.CREATED_AT), syncStatsRec.get(SYNC_STATS.UPDATED_AT));

final var streamStatsRec = jobDatabase.query(ctx -> {
final var attemptId = DefaultJobPersistence.getAttemptId(jobId, attemptNumber, ctx);
return ctx.fetch("SELECT * from stream_stats where attempt_id = ?", attemptId).stream().findFirst().get();
});
// Check time stamps to confirm upsert.
assertNotEquals(streamStatsRec.get(STREAM_STATS.CREATED_AT), streamStatsRec.get(STREAM_STATS.UPDATED_AT));
}

// check with updates with no namespaces

}

@Test
Expand Down