Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix duplicate data in Flink's upsert writer for format V2 #10526

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix comments
  • Loading branch information
zhongqishang committed Jun 21, 2024
commit 60d183d857f758a5547db8c1d2f2c1738ad8d7d4
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {

// Update the checkpoint state.
long startNano = System.nanoTime();
if (writeResultsSinceLastSnapshot.isEmpty()) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
} else {
for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCkpt.getKey(), writeToManifest(writeResultsOfCkpt.getKey()));
}
}
writeToManifestSinceLastSnapshot(checkpointId);

// Reset the snapshot state to the latest state.
checkpointsState.clear();
Expand All @@ -229,8 +221,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
jobIdState.clear();
jobIdState.add(flinkJobId);

// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
committerMetrics.checkpointDuration(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
}
Expand Down Expand Up @@ -446,13 +436,26 @@ public void processElement(StreamRecord<FlinkWriteResult> element) {
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
long currentCheckpointId = Long.MAX_VALUE;
dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
writeResultsSinceLastSnapshot.clear();

long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
writeToManifestSinceLastSnapshot(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId);
}

private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException {
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
if (writeResultsSinceLastSnapshot.isEmpty()) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
} else {
for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCkpt.getKey(), writeToManifest(writeResultsOfCkpt.getKey()));
}
}

// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
}

/**
* Write all the complete data files to a newly created manifest file and return the manifest's
* avro serialized bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<FlinkWriteResult>
implements OneInputStreamOperator<T, FlinkWriteResult>, BoundedOneInput {

private static final long serialVersionUID = 1L;
static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE;

private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
Expand All @@ -83,13 +82,6 @@ private SimpleDataUtil() {}
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

public static final Schema SCHEMA_WITH_PRIMARY_KEY =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get())),
Sets.newHashSet(1));

public static final TableSchema FLINK_SCHEMA =
TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build();

Expand Down Expand Up @@ -318,10 +310,6 @@ public static void assertTableRecords(Table table, List<Record> expected, String
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
if (!table.schema().identifierFieldNames().isEmpty()) {
Assert.assertFalse("Should not have the identical record", actualSet.contains(record));
}

actualSet.add(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -93,21 +92,15 @@ public class TestIcebergFilesCommitter extends TestBase {
@Parameter(index = 2)
private String branch;

@Parameter(index = 3)
private boolean hasPrimaryKey;

@Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}, hasPrimaryKey = {3}")
@Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}")
protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {1, FileFormat.AVRO, "main", false},
new Object[] {2, FileFormat.AVRO, "test-branch", false},
new Object[] {1, FileFormat.PARQUET, "main", false},
new Object[] {2, FileFormat.PARQUET, "test-branch", false},
new Object[] {1, FileFormat.ORC, "main", false},
new Object[] {2, FileFormat.ORC, "test-branch", false},
new Object[] {2, FileFormat.AVRO, "main", true},
new Object[] {2, FileFormat.PARQUET, "test-branch", true},
new Object[] {2, FileFormat.ORC, "main", true});
new Object[] {1, FileFormat.AVRO, "main"},
new Object[] {2, FileFormat.AVRO, "test-branch"},
new Object[] {1, FileFormat.PARQUET, "main"},
new Object[] {2, FileFormat.PARQUET, "test-branch"},
new Object[] {1, FileFormat.ORC, "main"},
new Object[] {2, FileFormat.ORC, "test-branch"});
}

@Override
Expand All @@ -119,9 +112,8 @@ public void setupTable() throws IOException {
this.metadataDir = new File(tableDir, "metadata");
assertThat(tableDir.delete()).isTrue();

Schema schema = hasPrimaryKey ? SimpleDataUtil.SCHEMA_WITH_PRIMARY_KEY : SimpleDataUtil.SCHEMA;
// Construct the iceberg table.
table = create(schema, PartitionSpec.unpartitioned());
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());

table
.updateProperties()
Expand Down Expand Up @@ -506,7 +498,6 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except

@TestTemplate
public void testStartAnotherJobToWriteSameTable() throws Exception {
assumeThat(hasPrimaryKey).as("The test case only for non-primary table.").isEqualTo(false);

long checkpointId = 0;
long timestamp = 0;
Expand Down Expand Up @@ -922,19 +913,36 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
}

@TestTemplate
public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {
public void testCommitMultipleCheckpointsForV2Table() throws Exception {
// The test case are designed to solve the following scenarios:

// V2 table with Upsert enabled.
// And the previous checkpoint is not executed normally, the next snapshot submits a single
// snapshot include multiple checkpoint. That is, prepareSnapshotPreBarrier is triggered twice,
// but snapshotState() is only triggered once.
// And the data with the same primary key is required in both checkpoints, and the data file and
// eq-delete file are generated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this to the method javadoc, and use something like this:

  /**
   * The testcase is to simulate upserting to a Iceberg V2 table, and facing the following
   * scenario:
   * <ul>
   *     <li>A specific row is updated
   *     <li>The prepareSnapshotPreBarrier triggered
   *     <li>Checkpoint failed for reasons outside of the Iceberg connector
   *     <li>The specific row is updated again in the second checkpoint as well
   *     <li>Second snapshot is triggered, and finished
   * </ul>
   * <p>Previously the files from the 2 snapshots were committed in a single Iceberg commit, as a
   * results duplicate rows were created in the table.
   * @throws Exception
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed javadoc.👍
Updated.


assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(1);

assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true);

long timestamp = 0;
long checkpoint = 10;

JobID jobId = new JobID();
OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory();

FileAppenderFactory<RowData> appenderFactory =
new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
table.spec(),
new int[] {table.schema().findField("id").fieldId()},
table.schema(),
null);

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
Expand All @@ -944,9 +952,11 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {

assertMaxCommittedCheckpointId(jobId, operatorId, -1L);

RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
RowData insert1 = null;
RowData insert2 = null;
for (int i = 1; i <= 3; i++) {
insert1 = SimpleDataUtil.createInsert(1, "aaa" + i);
insert2 = SimpleDataUtil.createInsert(2, "bbb" + i);
DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2));
DeleteFile deleteFile =
pvary marked this conversation as resolved.
Show resolved Hide resolved
writeEqDeleteFile(
Expand All @@ -958,10 +968,7 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {
++timestamp);
}

// The 1th snapshotState.
harness.snapshot(checkpoint, ++timestamp);

// Notify the 1th snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch);
pvary marked this conversation as resolved.
Show resolved Hide resolved
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
Expand Down
Loading