Skip to content

Commit

Permalink
[FLINK-19547][Runtime] Add the partialRecordLength when creating a Bu…
Browse files Browse the repository at this point in the history
…fferConsumer

Partial records happen if a record can not fit into one buffer, then the remaining part of the same record
is put into the next buffer. Hence partial records only exist at the beginning of a buffer.
Partial record clean-up is needed in the mode of approximate local recovery.
If a record is spanning over multiple buffers, and the first (several) buffers have got lost due to the failure
of the receiver task, the remaining data belonging to the same record in transition should be cleaned up.

`partialRecordLength` is the length of bytes to skip in order to start with a complete record,
from position index 0 of the underlying MemorySegment. `partialRecordLength` is used in approximate
local recovery to find the start position of a complete record on a BufferConsumer, so-called
`partial record clean-up`.

This commit includes
1). API change to add `partialRecordLength` when creating a BufferConsumer
2). Add `partialRecordLength` in `BufferWritingResultPartition` when emitRecord and `broadcastRecord`
  • Loading branch information
curcur authored and pnowojski committed Oct 27, 2020
1 parent 4e4b655 commit 11f9837
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>> getBuffer(Result
public void recover(ResultSubpartitionInfo subpartitionInfo, Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) throws IOException {
bufferBuilderAndConsumer.f0.finish();
if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
boolean added = getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1);
boolean added = getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1, Integer.MIN_VALUE);
if (!added) {
throw new IOException("Buffer consumer couldn't be added to ResultSubpartition");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,26 @@ public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
* @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}.
*/
public BufferConsumer createBufferConsumer() {
return createBufferConsumer(positionMarker.cachedPosition);
}

/**
* This method always creates a {@link BufferConsumer} starting from position 0 of {@link MemorySegment}.
*
* @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}.
*/
public BufferConsumer createBufferConsumerFromBeginning() {
return createBufferConsumer(0);
}

private BufferConsumer createBufferConsumer(int currentReaderPosition) {
checkState(!bufferConsumerCreated, "Two BufferConsumer shouldn't exist for one BufferBuilder");
bufferConsumerCreated = true;
return new BufferConsumer(
memorySegment,
recycler,
positionMarker,
positionMarker.cachedPosition);
currentReaderPosition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public boolean isReleased() {
}

@Override
public boolean add(BufferConsumer bufferConsumer) throws IOException {
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) throws IOException {
if (isFinished()) {
bufferConsumer.close();
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,26 +128,38 @@ protected void flushAllSubpartitions(boolean finishProducers) {

@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
do {
final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
bufferBuilder.appendAndCommit(record);
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);

if (bufferBuilder.isFull()) {
finishSubpartitionBufferBuilder(targetSubpartition);
}
} while (record.hasRemaining());
while (record.hasRemaining()) {
// full buffer, partial record
finishSubpartitionBufferBuilder(targetSubpartition);
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}

if (buffer.isFull()) {
// full buffer, full record
finishSubpartitionBufferBuilder(targetSubpartition);
}

// partial buffer, full record
}

@Override
public void broadcastRecord(ByteBuffer record) throws IOException {
do {
final BufferBuilder bufferBuilder = getBroadcastBufferBuilder();
bufferBuilder.appendAndCommit(record);
BufferBuilder buffer = appendBroadcastDataForNewRecord(record);

if (bufferBuilder.isFull()) {
finishBroadcastBufferBuilder();
}
} while (record.hasRemaining());
while (record.hasRemaining()) {
// full buffer, partial record
finishBroadcastBufferBuilder();
buffer = appendBroadcastDataForRecordContinuation(record);
}

if (buffer.isFull()) {
// full buffer, full record
finishBroadcastBufferBuilder();
}

// partial buffer, full record
}

@Override
Expand All @@ -159,7 +171,7 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws
try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) {
for (ResultSubpartition subpartition : subpartitions) {
// Retain the buffer so that it can be recycled by each channel of targetPartition
subpartition.add(eventBufferConsumer.copy());
subpartition.add(eventBufferConsumer.copy(), 0);
}
}
}
Expand Down Expand Up @@ -211,46 +223,84 @@ protected void releaseInternal() {
}
}

private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
if (bufferBuilder != null) {
return bufferBuilder;
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record,
final int targetSubpartition) throws IOException {
BufferBuilder buffer = subpartitionBufferBuilders[targetSubpartition];

if (buffer == null) {
buffer = requestNewUnicastBufferBuilder(targetSubpartition);
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
}

return getNewSubpartitionBufferBuilder(targetSubpartition);
buffer.appendAndCommit(record);

return buffer;
}

private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
checkInProduceState();
ensureUnicastMode();
private BufferBuilder appendUnicastDataForRecordContinuation(
final ByteBuffer remainingRecordBytes,
final int targetSubpartition) throws IOException {
final BufferBuilder buffer = requestNewUnicastBufferBuilder(targetSubpartition);
// !! Be aware, in case of partialRecordBytes != 0, partial length and data has to `appendAndCommit` first
// before consumer is created. Otherwise it would be confused with the case the buffer starting
// with a complete record.
// !! The next two lines can not change order.
final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), partialRecordBytes);

return buffer;
}

final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
return bufferBuilder;
private BufferBuilder appendBroadcastDataForNewRecord(final ByteBuffer record) throws IOException {
BufferBuilder buffer = broadcastBufferBuilder;

if (buffer == null) {
buffer = requestNewBroadcastBufferBuilder();
createBroadcastBufferConsumers(buffer, 0);
}

buffer.appendAndCommit(record);

return buffer;
}

private BufferBuilder getBroadcastBufferBuilder() throws IOException {
if (broadcastBufferBuilder != null) {
return broadcastBufferBuilder;
private BufferBuilder appendBroadcastDataForRecordContinuation(
final ByteBuffer remainingRecordBytes) throws IOException {
final BufferBuilder buffer = requestNewBroadcastBufferBuilder();
// !! Be aware, in case of partialRecordBytes != 0, partial length and data has to `appendAndCommit` first
// before consumer is created. Otherwise it would be confused with the case the buffer starting
// with a complete record.
// !! The next two lines can not change order.
final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
createBroadcastBufferConsumers(buffer, partialRecordBytes);

return buffer;
}

private void createBroadcastBufferConsumers(BufferBuilder buffer, int partialRecordBytes) throws IOException {
try (final BufferConsumer consumer = buffer.createBufferConsumerFromBeginning()) {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.add(consumer.copy(), partialRecordBytes);
}
}
}

return getNewBroadcastBufferBuilder();
private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition) throws IOException {
checkInProduceState();
ensureUnicastMode();
final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;

return bufferBuilder;
}

private BufferBuilder getNewBroadcastBufferBuilder() throws IOException {
private BufferBuilder requestNewBroadcastBufferBuilder() throws IOException {
checkInProduceState();
ensureBroadcastMode();

final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(0);
broadcastBufferBuilder = bufferBuilder;

try (final BufferConsumer consumer = bufferBuilder.createBufferConsumer()) {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.add(consumer.copy());
}
}

return bufferBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public interface CheckpointedResultSubpartition {

BufferBuilder requestBufferBuilderBlocking() throws IOException, RuntimeException, InterruptedException;

boolean add(BufferConsumer bufferConsumer);
boolean add(BufferConsumer bufferConsumer, int partialRecordLength) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
}

@Override
public boolean add(BufferConsumer bufferConsumer) {
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) {
return add(bufferConsumer, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected void onConsumedSubpartition() {
parent.onConsumedSubpartition(getSubPartitionIndex());
}

@VisibleForTesting
public final boolean add(BufferConsumer bufferConsumer) throws IOException {
return add(bufferConsumer, 0);
}

/**
* Adds the given buffer.
*
Expand All @@ -89,11 +94,14 @@ protected void onConsumedSubpartition() {
*
* @param bufferConsumer
* the buffer to add (transferring ownership to this writer)
* @param partialRecordLength
* the length of bytes to skip in order to start with a complete record, from position index 0
* of the underlying {@cite MemorySegment}.
* @return true if operation succeeded and bufferConsumer was enqueued for consumption.
* @throws IOException
* thrown in case of errors while adding the buffer
*/
public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
public abstract boolean add(BufferConsumer bufferConsumer, int partialRecordLength) throws IOException;

public abstract void flush();

Expand Down

0 comments on commit 11f9837

Please sign in to comment.