Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark3 DSv2: Handle Snapshots of Type OVERWRITE - while reading Stream from Iceberg table #2788

Open
SreeramGarlapati opened this issue Jul 6, 2021 · 4 comments

Comments

@SreeramGarlapati
Copy link
Collaborator

Background / context

This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.

To read a stream out of Iceberg table - the above PR iterates thru all the Iceberg Snapshots (the table history) starting from the very FIRST Snapshot and grabs the list of added files in all these snapshot one-by-one and hands off the rows in them to Spark3 DSv2 reader. However, this along with #2752 - doesn't handle when the streaming reader encounters a SNAPSHOT of type OVERWRITE.

The current issue is a request to extend the implementation to be able to support that.

Implementation/Idea Proposal

What does an Iceberg SNAPSHOT of type OVERWRITE do / have ?

To simplify, Imagine an Iceberg table comprises of just 1 dataFile with 100 rows - row [a1, b1].....[a100, b100]. If a Writer performs a Merge / upsert operation on [a1, b1] to update it to [a1, b1_upserted], then, a Snapshot of type OVERWRITE is written.

Iceberg Table - with spec version 1

In Iceberg version 1 - this Merge/Upsert operation results in a net new Iceberg table SNAPSHOT with this One Single ROW in this dataFile - changed to the new value & the file fully rewritten.
Now, the table snapshots look like
S1 --> [a1, b1], [a2, b2].....[a100, b100]
S2 --> [a1, b1_upserted], [a2, b2].....[a100, b100]

So, when the streaming reader encounters a SNAPSHOT of type OVERWRITE (i.e., S2) - if we stream all the added files - then, this will result restreaming [a2, b2].....[a100, b100] - DUPLICATE data!

This is a limitation in Iceberg table Spec Version 1 & can be solved with version 2 (where the data and delete files are written as the Snapshot!).

So, the proposal here is to implement a Spark option - with a known limitation to replay a lot of duplicate data.

Iceberg Table - with spec version 2

In Iceberg version 2 - this Merge/Upsert operation results in a net new Iceberg table SNAPSHOT with this additional One Single ROW in a NET NEW dataFile and 1 single record in a new DeleteFile.
S1 --> DataFiles { [a1, b1], [a2, b2].....[a100, b100] }
S2 --> DataFiles { [a1, b1], [a2, b2].....[a100, b100] , [a1, b1_upserted]} + DeleteFiles { a=a1 }

In this case - when the Streaming reader encounters a SNAPSHOT of type OVERWRITE (i.e., S2) - the addedFiles perfectly corresponds to the newly added rows - [a1, b1_upserted]!

This with the combination of Skipping SNAPSHOTs of type REPLACE - should avoid the possibility of duplicate data.

@SreeramGarlapati SreeramGarlapati changed the title Handle Snapshots of Type OVERWRITE - while reading Stream from Iceberg table Spark3 DSv2: Handle Snapshots of Type OVERWRITE - while reading Stream from Iceberg table Jul 6, 2021
tprelle added a commit to ubisoft/iceberg that referenced this issue Aug 5, 2021
As we can not to better for table v1, user can add the option
streaming-resend-overwrite-snapshots if he allow spark to resent
all the data for overwrite snapshots

Fix table v1 part of apache#2788
tprelle added a commit to ubisoft/iceberg that referenced this issue Aug 9, 2021
As we can not to better for table v1, user can add the option
streaming-resend-overwrite-snapshots if he allow spark to resent
all the data for overwrite snapshots

Fix table v1 part of apache#2788
@kbendick
Copy link
Contributor

kbendick commented Oct 9, 2021

Hi @SreeramGarlapati,

I was looking into this recently as I was doing some investigation into existing CDC systems. Particularly as Spark has no built in concept of this.

Your proposal seems pretty good. My one concern would be streaming all data from V1 format tables in OVERWRITE snapshots by default. If that's a flag people can opt into, I could get behind that. But that's potentially quite a lot of duplicated data, and I think we could in theory do some sort of anti-join on files from past snapshots if they haven't been physically deleted yet (expensive, but could get much more correct results for V1 tables).

I don't have a very strong opinion on much related to V1 tables, other than restreaming potential duplicates should definitely be opt-in imo. 🙂

@tmnd1991
Copy link

tmnd1991 commented Mar 9, 2022

Hi all, I would love to contribute on this, may I propose a PR only for v2 tables and then we can think about how to go for v1?

@imonteroq
Copy link

imonteroq commented Nov 29, 2023

Hi @SreeramGarlapati , @tmnd1991 are there any plans to implement this? This is a selling point for Hudi over Iceberg.

Copy link

github-actions bot commented Jul 4, 2024

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

4 participants