Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix duplicate data in Flink's upsert writer for format V2 #10526

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

zhongqishang
Copy link
Contributor

resolve #10431

@github-actions github-actions bot added the flink label Jun 18, 2024
@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: Why would this solve the issue?

In my mental model, this only changes when the files are closed. The files are added in the IcebergFilesCommitter operator state in the snapshotState and to the table in the notifyCheckpointComplete, so I think the issue could be about how calling these methods are orchestrated.

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 18, 2024

@zhongqishang: Why would this solve the issue?

I am trying to solve this problem. This scenario is hard to reproduce.

I want to re-express the background of the problem. The checkpoint was triggered twice in succession, and the previous checkpoint was canceled. Cancelling the checkpoint did not cause the flink job to fail. Therefore, the results of prepareSnapshotPreBarrier twice were generated in one commit.

Moving to snapshot is just to ensure that flush success is bound to checkpoint success, so that the above scenario will not occur.

Of course I'm not sure if this will solve the problem or if it will cause other problems.

@pvary Thank you very much for your guidance.

@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

@zhongqishang
Copy link
Contributor Author

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

Operator not restarted. The flink job is normal.

@pvary
Copy link
Contributor

pvary commented Jun 18, 2024

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Sadly, I don't understand your answer fully.
What do we know about the writers/sources? Are they restarted?

Also, could you please check if the IcebergFilesCommitter operator is restarted, or not in the meantime?

Operator not restarted. The flink job is normal.

Do we have logs like these?
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240
https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

@zhongqishang
Copy link
Contributor Author

@zhongqishang: We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. Could you confirm that the writers are restarted, or the source is restarted? Is the source is stateful, and the state of the source is handled correctly?

Just like you said.

Sadly, I don't understand your answer fully. What do we know about the writers/sources? Are they restarted?

I'm sorry for not expressing myself clearly. I confirmed that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint. I confirmed that the writers and the source are not restarted.

Do we have logs like these? https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63 https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240 https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

Not found the logs.

I think the checkpoint is not executing snapshotState and the checkpoint is aborted after running prepareSnapshotPreBarrier.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

Thanks for the clarification around the restarts.

Not found the logs.

I think the checkpoint is not executing snapshotState and the checkpoint is aborted after running prepareSnapshotPreBarrier.

That's strange. Could you please double check (log levels, TM logs)? These are the logs which should be there if the changes are committed by the Flink job.

If the logs aren't there, you can cross-check by the snapshot summary. Here please check if the flink jobId, checkpointId, operatorId is there or missing on the commit.

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 19, 2024

That's strange. Could you please double check (log levels, TM logs)? These are the logs which should be there if the changes are committed by the Flink job.

https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L204C10-L204C63 https://github.com/apache/iceberg/blob/apache-iceberg-

For #10431 (comment)

log level is INFO, A normal checkpoint has a commit log.

Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86446
// The log of aborted checkpoint 86447 is missing
Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86448

https://github.com/apache/iceberg/blob/apache-iceberg-1.2.1/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240

This log does not exist in the all tm log.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

log level is INFO, A normal checkpoint has a commit log.

Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86446
// The log of aborted checkpoint 86447 is missing
Start to flush snapshot state to state backend, table: iceberg_catalog.ods_iceberg_db.xx, checkpointId: 86448

This means, that the IcebergFilesCommitter checkpointing is not started at all.
Since the files are created we could conclude that the IcebergStreamWriter.flush is called. Also, since the next write didn't fail, I would expect that the prepareSnapshotPreBarrier is finished correctly. That would mean, that the created data files are collected in the flush method... So everything seems normal.

Again.. are we sure that we don't get duplicated records from the input stream for whatever reasons?

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 19, 2024

Again.. are we sure that we don't get duplicated records from the input stream for whatever reasons?

The task has upsert enabled.
All data will be entered multiple times from the input stream. I have hundreds of tasks. Only in the scenario I described will two pieces of data with the same ID be displayed in the query results. Upsert should only keep one.

We have 2 data files with the same data. I suspect that the 1st data file is generated in the 1st checkpoint, and the 2nd data file is generated in the 2nd checkpoint.

As you described before.

The problem occurs when the result of 2 prepareSnapshotPreBarriers appears in one commit.

#10431 (comment)

As I described before, in the one commit, it will query 2 pieces of data with the same id.

@pvary
Copy link
Contributor

pvary commented Jun 19, 2024

So in summary:

  • The duplicated data is expected as this is coming from the input stream
  • The issue is that the WriteResult does not contain the checkpointId, and the IcebergFilesCommitter commits the changes for 2 checkpoints in a single snapshot.

So the fix should be something along these lines:

  • Add checkpointId to the WriteResult
  • Change the IcebergFileCommitter to consider the checkpointId in the WriteResults and generate multiple commits if needed.

@rodmeneses: Is this issue solved naturally by the new SinkV2 implemetation?

@github-actions github-actions bot added the core label Jun 20, 2024
@zhongqishang zhongqishang changed the title Flink: move flush operation from prepareSnapshotPreBarrier to snapshotState Flink: Fix duplicate data in Flink's upsert writer for format V2 Jun 20, 2024
@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 20, 2024

@pvary
Adding checkpointId to WriteResult will cause changes to the Public API and is incompatible with serialization.
So I added a wrapper class FlinkWriteResult to Flink to store checkpointId.
WDYT?

Can you give me some advice?

@zhongqishang zhongqishang marked this pull request as ready for review June 20, 2024 12:16
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
}
}

@TestTemplate
public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe a better name, like testCommitMultipleCheckpointsForV2Table?
Or anything where we highlight that the goal is to create different commits for different checkpoints, even if the WriteResults arrive at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not satisfied with the name (even the on I suggested), so probably the best would be to add javadoc to the test method describing the scenario we are testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the triggering conditions are a bit complicated, and it is necessary to add javadoc to describe it clearly.

@zhongqishang zhongqishang force-pushed the issue-10431 branch 2 times, most recently from b24a6a2 to b2f398b Compare June 24, 2024 03:22
@zhongqishang zhongqishang requested a review from pvary June 25, 2024 11:10
Comment on lines 917 to 924
// The test case are designed to solve the following scenarios:

// V2 table with Upsert enabled.
// And the previous checkpoint is not executed normally, the next snapshot submits a single
// snapshot include multiple checkpoint. That is, prepareSnapshotPreBarrier is triggered twice,
// but snapshotState() is only triggered once.
// And the data with the same primary key is required in both checkpoints, and the data file and
// eq-delete file are generated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this to the method javadoc, and use something like this:

  /**
   * The testcase is to simulate upserting to a Iceberg V2 table, and facing the following
   * scenario:
   * <ul>
   *     <li>A specific row is updated
   *     <li>The prepareSnapshotPreBarrier triggered
   *     <li>Checkpoint failed for reasons outside of the Iceberg connector
   *     <li>The specific row is updated again in the second checkpoint as well
   *     <li>Second snapshot is triggered, and finished
   * </ul>
   * <p>Previously the files from the 2 snapshots were committed in a single Iceberg commit, as a
   * results duplicate rows were created in the table.
   * @throws Exception
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed javadoc.👍
Updated.

@pvary
Copy link
Contributor

pvary commented Jun 25, 2024

Let me check what compatibility issues we are creating by changing the WriteResult to FlinkWriteResult.
Otherwise looks good to me

@pvary
Copy link
Contributor

pvary commented Jun 25, 2024

@stevenzwu:
How should we handle this situation?
WriteResult is used between the communication of the writer and the committer. WriteResult is part of the Iceberg API, so we can not change it.

Without changing the WriteResult we can't fix the issue (at least, I can't see a way right now).

If we change the type we use between the communication of the writer and the committer, then the users who are using unaligned checkpoints in their job with FlinkSink, need to restart the job without state. For users who are using aligned checkpoints this is a compatible fix.

Do you have better idea, how to fix this? If not, how do we communicate this? (The job restart will fail, but it would be nicer if we document this somewhere)

@pvary
Copy link
Contributor

pvary commented Jul 1, 2024

I'm still thinking about the best way moving forward. Maybe we should ask the dev lists opinion about the compatibility issue. Let's see how widely the unaligned checkpoints are used in the community.

@zhongqishang
Copy link
Contributor Author

I'm still thinking about the best way moving forward. Maybe we should ask the dev lists opinion about the compatibility issue. Let's see how widely the unaligned checkpoints are used in the community.

Thanks @pvary the continued follow-up. It's a very good idea that ask the dev lists the unaligned checkpoints are used in the community.

My English output is not accurate, can you send an email about this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flink sink writes duplicate data in upsert mode
2 participants