Flink-1.19: Fix the file offset mismatch when Flink reader first seek… #10567
+66
−4
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
…s position.
When
DataIterator
first seeks position (startingFileOffset=0 & startingRecordOffset=0
) on a split,fileOffset
might be assigned incorrectly. This can lead to Flink consuming data repeatedly when recovering from ckpt or to not being able to recover from ckpt because ofIllegalStateException: Invalid starting record offset...
Here is an example:
Suppose there is a
CombinedScanTask
that contains threeFileScanTask
with the following metrics and residual filters:When
DataIterator
initializes for the first time, it calls#seek
withstartingFileOffset=0 and startingRecordOffset=0
. In#updateCurrentIterator
, whencurrentIterator
is moved to file 0 (fileScanTask-0) , the subsequent while condition!currentIterator.hasNext()
will still be true because parquet row-group filters evaluate that fileScanTask-0 can be skipped. Therefore,currentIterator
will be moved to fileScanTask-1. This also increasesfileOffset
, which is the correct offset by now.However, when returning to the
#seek
method,fileOffset
will be assigned the value ofstartingFileOffset
(0, the actual value is 1), which is wrong:This results in the following:
Suppose Flink sets a ckpt at record offset 50 of fileScanTask-2. The ckpt record would be (
fileOffset = 1, recordOffset=50
), while the real value should be (fileOffset = 2, recordOffset=50
). When the job attempts to recover from this ckpt, it will try to start reading records again from (fileOffset = 1, recordOffset=50
).IllegalStateException: Invalid starting record offset...
will be thrown, and the job cannot recovercc @pvary @stevenzwu could you take a look at this? thanks!