Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Progress Bar Persistence Read/Write #20787

Merged
merged 21 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0be298c
Checkpoint: Add interface methods.
davinchia Dec 20, 2022
d6e12a7
Correct the attempt id column to be a bigint.
davinchia Dec 24, 2022
326c555
Correct the wrong constraints.
davinchia Dec 24, 2022
402913c
Start adding tests. Alphabetise the SyncStats.yaml. Add get StreamSta…
davinchia Dec 24, 2022
513c068
Implement the right migration.
davinchia Dec 24, 2022
5bf2b49
Add estimated fieldds to the SyncStats message.
davinchia Dec 24, 2022
6d3661f
Change interface to a combined stats POJO object. Modify query to sor…
davinchia Dec 24, 2022
6699e03
Merge remote-tracking branch 'origin/master' into davinchia/implement…
davinchia Dec 27, 2022
ed35076
Get the basic test passing.
davinchia Dec 27, 2022
055a599
Add upsert handling for sync stats table.
davinchia Dec 27, 2022
3d3a5bc
Edit migration to allow for namespace to be null. Implement writing t…
davinchia Dec 27, 2022
4c6701a
Add upsert handling for stream stats table. Buff up test suite.
davinchia Dec 27, 2022
1059b86
Checkpoint: Everything works except upsert for a stream with null nam…
davinchia Dec 27, 2022
8d304f4
Get everything passing.
davinchia Dec 27, 2022
efca05e
Clean up.
davinchia Dec 27, 2022
5500bc2
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
6b7eddd
Remove bad annotation.
davinchia Dec 27, 2022
56436ff
Add field for namespace.
davinchia Dec 27, 2022
44f6bb6
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
c25d472
Respond to PR feedback.
davinchia Dec 28, 2022
b3a1027
Respond to PR feedback.
davinchia Dec 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change interface to a combined stats POJO object. Modify query to sor…
…t in order to always return the last entry.
  • Loading branch information
davinchia committed Dec 24, 2022
commit 6d3661fc6b953f103eded5ac9fb9b9c1079ed87f
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.constraint;
import static org.jooq.impl.DSL.unique;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
Expand All @@ -30,9 +29,10 @@ public void migrate(final Context context) throws Exception {
// This actually needs to be bigint.
ctx.alterTable("stream_stats").alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute();

// The constraint should also take into account the stream namespace. Drop the constraint and recreate it.
// The constraint should also take into account the stream namespace. Drop the constraint and
// recreate it.
ctx.alterTable("stream_stats").dropUnique("stream_stats_attempt_id_stream_name_key").execute();
ctx.alterTable("stream_stats").add(constraint("uniq_stream_attempt").unique("attempt_id","stream_name","stream_namespace")).execute();
ctx.alterTable("stream_stats").add(constraint("uniq_stream_attempt").unique("attempt_id", "stream_name", "stream_namespace")).execute();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,18 @@ private static void saveToStreamStatsTable(final long estimatedRecords,
final DSLContext ctx,
final Long attemptId) {
final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId));
// if (isExisting) {
// what else do we need to update?
// ctx.update(STREAM_STATS)
// .set(STREAM_STATS.BYTES_EMITTED, bytesEmitted)
// .set(STREAM_STATS.RECORDS_EMITTED, recordsEmitted)
// .set(STREAM_STATS.ESTIMATED_BYTES, estimatedBytes)
// .set(STREAM_STATS.ESTIMATED_RECORDS, estimatedRecords)
// .set(STREAM_STATS.UPDATED_AT, now)
// .where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
// .execute();
// return;
// }
// if (isExisting) {
// what else do we need to update?
// ctx.update(STREAM_STATS)
// .set(STREAM_STATS.BYTES_EMITTED, bytesEmitted)
// .set(STREAM_STATS.RECORDS_EMITTED, recordsEmitted)
// .set(STREAM_STATS.ESTIMATED_BYTES, estimatedBytes)
// .set(STREAM_STATS.ESTIMATED_RECORDS, estimatedRecords)
// .set(STREAM_STATS.UPDATED_AT, now)
// .where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
// .execute();
// return;
// }

// insert or update into stream stats table
// insert stream name and namespace
Expand Down Expand Up @@ -462,20 +462,19 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
}

@Override
public List<SyncStats> getSyncStats(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) throws IOException {
// need to confirm this is correctly sorted
final var syncStats = jobDatabase
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList();
return
ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.orderBy(SYNC_STATS.UPDATED_AT.desc())
.limit(1)
.fetchOne(getSyncStatsRecordMapper());
});
}

@Override
public List<SyncStats> getStreamStats(long jobId, int attemptNumber) throws IOException {
return null;
// hydrate perStreamStats
return new AttemptStats(syncStats, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,25 @@
*/
public interface JobPersistence {

List<SyncStats> getSyncStats(long jobId, int attemptNumber) throws IOException;
//
// SIMPLE GETTERS
//

/**
* Convenience POJO for various stats data structures.
*
* @param combinedStats
* @param perStreamStats
*/
record AttemptStats(SyncStats combinedStats, StreamSyncStats perStreamStats) {}

List<SyncStats> getStreamStats(long jobId, int attemptNumber) throws IOException;
/**
* Retrieve the combined and per stream stats for a single attempt.
*
* @return {@link AttemptStats}
* @throws IOException
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void testWriteOutput() throws IOException {
assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());

final SyncStats storedSyncStats = jobPersistence.getSyncStats(jobId, attemptNumber).stream().findFirst().get();
final SyncStats storedSyncStats = jobPersistence.getAttemptStats(jobId, attemptNumber).combinedStats();
assertEquals(100L, storedSyncStats.getBytesEmitted());
assertEquals(9L, storedSyncStats.getRecordsEmitted());
assertEquals(10L, storedSyncStats.getRecordsCommitted());
Expand Down Expand Up @@ -326,54 +326,56 @@ void testWriteAttemptFailureSummary() throws IOException {
@DisplayName("Test writing in progress stats")
class WriteStats {

@Test
@DisplayName("Writing stats the first time should only write record and bytes information correctly")
void testWriteStatsFirst() throws IOException, SQLException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);

final var stats = jobPersistence.getSyncStats(jobId, attemptNumber).stream().findFirst().get();
assertEquals(1000, stats.getBytesEmitted());
assertEquals(1000, stats.getRecordsEmitted());
assertEquals(1000, stats.getEstimatedBytes());
assertEquals(1000, stats.getEstimatedRecords());

assertEquals(null, stats.getRecordsCommitted());
assertEquals(null, stats.getDestinationStateMessagesEmitted());
}

// @Test
// @DisplayName("Writing stats multiple times should write record and bytes information correctly without exceptions")
// void testWriteSyncStatsRepeated() throws IOException, SQLException {
// @DisplayName("Writing stats the first time should only write record and bytes information correctly")
// void testWriteStatsFirst() throws IOException, SQLException {
// final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
// final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
// jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);
//
// final Optional<Record> record =
// jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
// attemptNumber).stream().findFirst());
// final Long attemptId = record.get().get("id", Long.class);
//
// var stat = jobPersistence.getSyncStats(attemptId).stream().findFirst().get();
// assertEquals(1000, stat.getBytesEmitted());
// assertEquals(1000, stat.getRecordsEmitted());
// assertEquals(1000, stat.getEstimatedBytes());
// assertEquals(1000, stat.getEstimatedRecords());
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, null);
// var stats = jobPersistence.getSyncStats(attemptId);
// assertEquals(1, stats.size());
//
// stat = stats.stream().findFirst().get();
// assertEquals(2000, stat.getBytesEmitted());
// assertEquals(2000, stat.getRecordsEmitted());
// assertEquals(2000, stat.getEstimatedBytes());
// assertEquals(2000, stat.getEstimatedRecords());
// final var stats = jobPersistence.getAttemptStats(jobId, attemptNumber).combinedStats();
// assertEquals(1000, stats.getBytesEmitted());
// assertEquals(1000, stats.getRecordsEmitted());
// assertEquals(1000, stats.getEstimatedBytes());
// assertEquals(1000, stats.getEstimatedRecords());
//
// assertEquals(null, stats.getRecordsCommitted());
// assertEquals(null, stats.getDestinationStateMessagesEmitted());
// }

// @Test
// @DisplayName("Writing stats multiple times should write record and bytes information correctly
// without exceptions")
// void testWriteSyncStatsRepeated() throws IOException, SQLException {
// final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
// final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null);
//
// final Optional<Record> record =
// jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number =
// ?", jobId,
// attemptNumber).stream().findFirst());
// final Long attemptId = record.get().get("id", Long.class);
//
// var stat = jobPersistence.getSyncStats(attemptId).stream().findFirst().get();
// assertEquals(1000, stat.getBytesEmitted());
// assertEquals(1000, stat.getRecordsEmitted());
// assertEquals(1000, stat.getEstimatedBytes());
// assertEquals(1000, stat.getEstimatedRecords());
//
// jobPersistence.writeSyncStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, null);
// var stats = jobPersistence.getSyncStats(attemptId);
// assertEquals(1, stats.size());
//
// stat = stats.stream().findFirst().get();
// assertEquals(2000, stat.getBytesEmitted());
// assertEquals(2000, stat.getRecordsEmitted());
// assertEquals(2000, stat.getEstimatedBytes());
// assertEquals(2000, stat.getEstimatedRecords());
//
// }

}

@Test
Expand Down