Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Fix an edge case of getting duplicated records when using TextIO. (ap…
Browse files Browse the repository at this point in the history
…ache#30026)

When processing a CRLF-delimited file and the read buffer has
CR as the last character, startOfNextRecord will be set to the
position after the CR, i.e. the following LF. Let's say the
position of this LF is p.

In the next buffer, even though the actual start of record should be
p+1, startOfRecord is set to startOfNextRecord, which is p.

Then the code processes the next record by skipping the LF and yields
a record starting from p+1. It decides whether the record is valid by
checking if startOfRecord is in the range defined in RangeTracker.

If there is a split right after p, i.e. we have ranges [a, p+1) and [p+1, b),
then the above record would be considered as valid in the split [a, p+1),
because its startOfRecord is p <= p+1. However, the record is also
considered valid when split [p+1, b) is processed, resulting into
duplicated records in the output.
  • Loading branch information
shunping committed Jan 17, 2024
1 parent 87145a8 commit b5dc728
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,13 @@ private boolean readDefaultLine() throws IOException {

// Consume any LF after CR if it is the first character of the next buffer
if (skipLineFeedAtStart && buffer[bufferPosn] == LF) {
++bytesConsumed;
++startPosn;
++bufferPosn;
skipLineFeedAtStart = false;

// Right now, startOfRecord is pointing at the position of LF, but the actual start
// position of the new record should be the position after LF.
++startOfRecord;
}

// Search for the newline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,53 +386,96 @@ public void testReadLinesWithDefaultDelimiter() throws Exception {
runTestReadWithData(line.getBytes(UTF_8), expected);
}

// Placeholder channel that only yields 0- and 1-length buffers.
private static class SlowReadChannel implements ReadableByteChannel {
int readCount = 0;
InputStream stream;
ReadableByteChannel channel;

public SlowReadChannel(FileBasedSource source) throws IOException {
channel =
FileSystems.open(
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
stream = Channels.newInputStream(channel);
}

// Data is read at most one byte at a time from line parameter.
@Override
public int read(ByteBuffer dst) throws IOException {
if (++readCount % 3 == 0) {
if (dst.hasRemaining()) {
int value = stream.read();
if (value == -1) {
return -1;
}
dst.put((byte) value);
return 1;
}
}
return 0;
}

@Override
public boolean isOpen() {
return channel.isOpen();
}

@Override
public void close() throws IOException {
stream.close();
}
}

@Test
public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel()
throws Exception {
public void testReadLinesWithDefaultDelimiterAndSlowReadChannel() throws Exception {
Path path = tempFolder.newFile().toPath();
Files.write(path, line.getBytes(UTF_8));
Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
FileBasedSource source =
getTextSource(path.toString(), null, 0)
.createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());

FileBasedReader<String> reader =
source.createSingleFileReader(PipelineOptionsFactory.create());
ReadableByteChannel channel =
FileSystems.open(
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
InputStream stream = Channels.newInputStream(channel);
reader.startReading(
// Placeholder channel that only yields 0- and 1-length buffers.
// Data is read at most one byte at a time from line parameter.
new ReadableByteChannel() {
int readCount = 0;

@Override
public int read(ByteBuffer dst) throws IOException {
if (++readCount % 3 == 0) {
if (dst.hasRemaining()) {
int value = stream.read();
if (value == -1) {
return -1;
}
dst.put((byte) value);
return 1;
}
}
return 0;
}
reader.startReading(new SlowReadChannel(source));
assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
}

@Override
public boolean isOpen() {
return channel.isOpen();
}
@Test
public void testReadLinesWithDefaultDelimiterOnSplittingSourceAndSlowReadChannel()
throws Exception {
Path path = tempFolder.newFile().toPath();
Files.write(path, line.getBytes(UTF_8));
Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
FileBasedSource<String> source =
getTextSource(path.toString(), null, 0)
.createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());

@Override
public void close() throws IOException {
stream.close();
}
});
assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
PipelineOptions options = PipelineOptionsFactory.create();

// Check every possible split positions.
for (int i = 0; i < line.length(); ++i) {
double fraction = i * 1.0 / line.length();
FileBasedReader<String> reader = source.createSingleFileReader(options);

// Use a slow read channel to read the content byte by byte. This can simulate the scenario
// of a certain character (in our case CR) occurring at the end of the read buffer.
reader.startReading(new SlowReadChannel(source));

// In order to get a successful split, we need to read at least one record before calling
// splitAtFraction().
List<String> totalItems = SourceTestUtils.readNItemsFromStartedReader(reader, 1);
BoundedSource<String> residual = reader.splitAtFraction(fraction);
List<String> primaryItems = SourceTestUtils.readFromStartedReader(reader);
totalItems.addAll(primaryItems);

if (residual != null) {
List<String> residualItems = SourceTestUtils.readFromSource(residual, options);
totalItems.addAll(residualItems);
}
assertEquals(expected, totalItems);
}
}

@Test
Expand Down

0 comments on commit b5dc728

Please sign in to comment.