-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fix duplicate data in Flink's upsert writer for format V2 #10526
base: main
Are you sure you want to change the base?
Conversation
@zhongqishang: Why would this solve the issue? In my mental model, this only changes when the files are closed. The files are added in the |
I am trying to solve this problem. This scenario is hard to reproduce. I want to re-express the background of the problem. The checkpoint was triggered twice in succession, and the previous checkpoint was canceled. Cancelling the checkpoint did not cause the flink job to fail. Therefore, the results of prepareSnapshotPreBarrier twice were generated in one commit. Moving to snapshot is just to ensure that flush success is bound to checkpoint success, so that the above scenario will not occur. Of course I'm not sure if this will solve the problem or if it will cause other problems. @pvary Thank you very much for your guidance. |
@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly? Also, could you please check if the |
Just like you said.
Operator not restarted. The flink job is normal. |
Sadly, I don't understand your answer fully.
Do we have logs like these? |
I'm sorry for not expressing myself clearly. I confirmed that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. I confirmed that the writers and the source are not restarted.
Not found the logs. I think the checkpoint is not executing |
Thanks for the clarification around the restarts.
That's strange. Could you please double check (log levels, TM logs)? These are the logs which should be there if the changes are committed by the Flink job. If the logs aren't there, you can cross-check by the snapshot summary. Here please check if the flink jobId, checkpointId, operatorId is there or missing on the commit. |
For #10431 (comment) log level is
This log does not exist in the all tm log. |
This means, that the Again.. are we sure that we don't get duplicated records from the input stream for whatever reasons? |
The task has upsert enabled.
As you described before. The problem occurs when the result of 2 As I described before, in the one commit, it will query 2 pieces of data with the same id. |
So in summary:
So the fix should be something along these lines:
@rodmeneses: Is this issue solved naturally by the new SinkV2 implemetation? |
0328342
to
f55d507
Compare
f55d507
to
0a90981
Compare
@pvary Can you give me some advice? |
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { | |||
} | |||
} | |||
|
|||
@TestTemplate | |||
public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe a better name, like testCommitMultipleCheckpointsForV2Table
?
Or anything where we highlight that the goal is to create different commits for different checkpoints, even if the WriteResults arrive at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not satisfied with the name (even the on I suggested), so probably the best would be to add javadoc to the test method describing the scenario we are testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the triggering conditions are a bit complicated, and it is necessary to add javadoc to describe it clearly.
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Show resolved
Hide resolved
b413f75
to
60d183d
Compare
b24a6a2
to
b2f398b
Compare
b2f398b
to
b4b6f36
Compare
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Show resolved
Hide resolved
// The test case are designed to solve the following scenarios: | ||
|
||
// V2 table with Upsert enabled. | ||
// And the previous checkpoint is not executed normally, the next snapshot submits a single | ||
// snapshot include multiple checkpoint. That is, prepareSnapshotPreBarrier is triggered twice, | ||
// but snapshotState() is only triggered once. | ||
// And the data with the same primary key is required in both checkpoints, and the data file and | ||
// eq-delete file are generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we put this to the method javadoc, and use something like this:
/**
* The testcase is to simulate upserting to a Iceberg V2 table, and facing the following
* scenario:
* <ul>
* <li>A specific row is updated
* <li>The prepareSnapshotPreBarrier triggered
* <li>Checkpoint failed for reasons outside of the Iceberg connector
* <li>The specific row is updated again in the second checkpoint as well
* <li>Second snapshot is triggered, and finished
* </ul>
* <p>Previously the files from the 2 snapshots were committed in a single Iceberg commit, as a
* results duplicate rows were created in the table.
* @throws Exception
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your detailed javadoc.👍
Updated.
Let me check what compatibility issues we are creating by changing the |
@stevenzwu: Without changing the If we change the type we use between the communication of the writer and the committer, then the users who are using unaligned checkpoints in their job with FlinkSink, need to restart the job without state. For users who are using aligned checkpoints this is a compatible fix. Do you have better idea, how to fix this? If not, how do we communicate this? (The job restart will fail, but it would be nicer if we document this somewhere) |
I'm still thinking about the best way moving forward. Maybe we should ask the dev lists opinion about the compatibility issue. Let's see how widely the unaligned checkpoints are used in the community. |
Thanks @pvary the continued follow-up. It's a very good idea that ask the dev lists the unaligned checkpoints are used in the community. My English output is not accurate, can you send an email about this? |
resolve #10431