-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark3 DSv2: Handle Snapshots of Type OVERWRITE - while reading Stream from Iceberg table #2788
Comments
As we can not to better for table v1, user can add the option streaming-resend-overwrite-snapshots if he allow spark to resent all the data for overwrite snapshots Fix table v1 part of apache#2788
As we can not to better for table v1, user can add the option streaming-resend-overwrite-snapshots if he allow spark to resent all the data for overwrite snapshots Fix table v1 part of apache#2788
I was looking into this recently as I was doing some investigation into existing CDC systems. Particularly as Spark has no built in concept of this. Your proposal seems pretty good. My one concern would be streaming all data from V1 format tables in I don't have a very strong opinion on much related to V1 tables, other than restreaming potential duplicates should definitely be opt-in imo. 🙂 |
Hi all, I would love to contribute on this, may I propose a PR only for v2 tables and then we can think about how to go for v1? |
Hi @SreeramGarlapati , @tmnd1991 are there any plans to implement this? This is a selling point for Hudi over Iceberg. |
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. |
Background / context
This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.
To read a stream out of Iceberg table - the above PR iterates thru all the Iceberg Snapshots (the table history) starting from the very FIRST Snapshot and grabs the
list of added files
in all these snapshot one-by-one and hands off the rows in them to Spark3 DSv2 reader. However, this along with #2752 - doesn't handle when the streaming reader encounters aSNAPSHOT
of typeOVERWRITE
.The current issue is a request to extend the implementation to be able to support that.
Implementation/Idea Proposal
What does an Iceberg
SNAPSHOT
of typeOVERWRITE
do / have ?To simplify, Imagine an Iceberg table comprises of just 1
dataFile
with 100 rows - row[a1, b1].....[a100, b100]
. If a Writer performs aMerge / upsert
operation on[a1, b1]
to update it to[a1, b1_upserted]
, then, a Snapshot of typeOVERWRITE
is written.Iceberg Table - with spec version 1
In Iceberg
version 1
- thisMerge/Upsert
operation results in a net new Iceberg tableSNAPSHOT
with this One Single ROW in thisdataFile
- changed to the new value & the file fully rewritten.Now, the table snapshots look like
S1
-->[a1, b1], [a2, b2].....[a100, b100]
S2
-->[a1, b1_upserted], [a2, b2].....[a100, b100]
So, when the streaming reader encounters a
SNAPSHOT
of typeOVERWRITE
(i.e.,S2
) - if we stream all the added files - then, this will result restreaming[a2, b2].....[a100, b100]
- DUPLICATE data!This is a limitation in Iceberg table Spec Version 1 & can be solved with version 2 (where the
data
anddelete
files are written as the Snapshot!).So, the proposal here is to implement a
Spark
option - with a known limitation to replay a lot of duplicate data.Iceberg Table - with spec version 2
In Iceberg
version 2
- thisMerge/Upsert
operation results in a net new Iceberg tableSNAPSHOT
with this additional One Single ROW in a NET NEWdataFile
and 1 single record in a newDeleteFile
.S1
--> DataFiles {[a1, b1], [a2, b2].....[a100, b100]
}S2
--> DataFiles {[a1, b1], [a2, b2].....[a100, b100]
,[a1, b1_upserted]
} + DeleteFiles {a=a1
}In this case - when the Streaming reader encounters a
SNAPSHOT
of typeOVERWRITE
(i.e.,S2
) - the addedFiles perfectly corresponds to the newly added rows -[a1, b1_upserted]
!This with the combination of Skipping
SNAPSHOT
s of typeREPLACE
- should avoid the possibility of duplicate data.The text was updated successfully, but these errors were encountered: