Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang committed Jun 24, 2024
1 parent 979d7c9 commit b2f398b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,28 +258,28 @@ private void commitUpToCheckpoint(
long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();

if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
// Skip the empty flink manifest.
} else {
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(
e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
continue;
}

CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, e.getKey());
committerMetrics.updateCommitSummary(summary);
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(
e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
}

CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
}

private void commitPendingResult(
Expand Down Expand Up @@ -442,14 +442,14 @@ public void endInput() throws IOException {
}

private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException {
if (writeResultsSinceLastSnapshot.isEmpty()) {
if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
} else {
for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCkpt.getKey(), writeToManifest(writeResultsOfCkpt.getKey()));
}
}

for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCkpt.getKey(), writeToManifest(writeResultsOfCkpt.getKey()));
}

// Clear the local buffer for current checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except

@TestTemplate
public void testStartAnotherJobToWriteSameTable() throws Exception {

long checkpointId = 0;
long timestamp = 0;
List<RowData> rows = Lists.newArrayList();
Expand Down Expand Up @@ -722,13 +721,13 @@ public void testBoundedStream() throws Exception {
List<RowData> tableRows = Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));

DataFile dataFile = writeDataFile("data-1", tableRows);
harness.processElement(of(Long.MAX_VALUE, dataFile), 1);
harness.processElement(of(IcebergStreamWriter.END_INPUT_CHECKPOINT_ID, dataFile), 1);
((BoundedOneInput) harness.getOneInputOperator()).endInput();

assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
assertMaxCommittedCheckpointId(jobId, operatorId, IcebergStreamWriter.END_INPUT_CHECKPOINT_ID);
assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary())
.containsEntry("flink.test", TestIcebergFilesCommitter.class.getName());
}
Expand Down

0 comments on commit b2f398b

Please sign in to comment.