Skip to content

Commit

Permalink
[FLINK-23724][network] Fix the network buffer leak when ResultPartiti…
Browse files Browse the repository at this point in the history
…on is released (apache#16844)

* [FLINK-23724][network][refactor] Make TaskCanceler call ResultPartitionWriter#fail instead of ResultPartitionWriter#close

Originally, the TaskCanceler calls the ResultPartitionWriter#close method to early release input and output buffer pools. However, the the ResultPartitionWriter#close method can also be called by the task thread to release other network resources which may lead to race conditions. This patch makes TaskCanceler call ResultPartitionWriter#fail instead of ResultPartitionWriter#close and close the output buffer pool in ResultPartitionWriter#fail which avoids the potential race conditions.

This closes apache#16844.

* [FLINK-23724][network] Fix the network buffer leak when ResultPartition is released

This patch fixes the network buffer leak issue by closing all BufferBuilders in the BufferWritingResultPartition#close method.

This closes apache#16844.
  • Loading branch information
wsry committed Aug 20, 2021
1 parent dfac762 commit 08f98b7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ protected void releaseInternal() {
}
}

@Override
public void close() {
// We can not close these buffers in the release method because of the potential race
// condition. This close method will be only called from the Task thread itself.
if (broadcastBufferBuilder != null) {
broadcastBufferBuilder.close();
broadcastBufferBuilder = null;
}
for (int i = 0; i < unicastBufferBuilders.length; ++i) {
if (unicastBufferBuilders[i] != null) {
unicastBufferBuilders[i].close();
unicastBufferBuilders[i] = null;
}
}
super.close();
}

private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws IOException {
if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,21 @@ public void release(Throwable cause) {
/** Releases all produced data including both those stored in memory and persisted on disk. */
protected abstract void releaseInternal();

@Override
public void close() {
private void closeBufferPool() {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
}

@Override
public void close() {
closeBufferPool();
}

@Override
public void fail(@Nullable Throwable throwable) {
// the task canceler thread will call this method to early release the output buffer pool
closeBufferPool();
partitionManager.releasePartition(partitionId, throwable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,24 +972,29 @@ private void releaseResources() {

for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
if (isCanceledOrFailed()) {
partitionWriter.fail(getFailureCause());
}
}

closeNetworkResources();
// close network resources
if (isCanceledOrFailed()) {
failAllResultPartitions();
}
closeAllResultPartitions();
closeAllInputGates();

try {
taskStateManager.close();
} catch (Exception e) {
LOG.error("Failed to close task state manager for task {}.", taskNameWithSubtask, e);
}
}

/**
* There are two scenarios to close the network resources. One is from {@link TaskCanceler} to
* early release partitions and gates. Another is from task thread during task exiting.
*/
private void closeNetworkResources() {
private void failAllResultPartitions() {
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
partitionWriter.fail(getFailureCause());
}
}

private void closeAllResultPartitions() {
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
try {
partitionWriter.close();
Expand All @@ -999,14 +1004,14 @@ private void closeNetworkResources() {
"Failed to release result partition for task {}.", taskNameWithSubtask, t);
}
}
}

private void closeAllInputGates() {
AbstractInvokable invokable = this.invokable;
if (invokable == null || !invokable.isUsingNonBlockingInput()) {
// Cleanup resources instead of invokable if it is null,
// or prevent it from being blocked on input,
// or interrupt if it is already blocked.
// Not needed for StreamTask (which does NOT use blocking input); for which this could
// cause race conditions
// Cleanup resources instead of invokable if it is null, or prevent it from being
// blocked on input, or interrupt if it is already blocked. Not needed for StreamTask
// (which does NOT use blocking input); for which this could cause race conditions
for (InputGate inputGate : inputGates) {
try {
inputGate.close();
Expand Down Expand Up @@ -1182,7 +1187,6 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
Runnable canceler =
new TaskCanceler(
LOG,
this::closeNetworkResources,
taskCancellationTimeout > 0
? taskCancellationTimeout
: TaskManagerOptions.TASK_CANCELLATION_TIMEOUT
Expand Down Expand Up @@ -1588,10 +1592,9 @@ private static AbstractInvokable loadAndInstantiateInvokable(
* This runner calls cancel() on the invokable, closes input-/output resources, and initially
* interrupts the task thread.
*/
private static class TaskCanceler implements Runnable {
private class TaskCanceler implements Runnable {

private final Logger logger;
private final Runnable networkResourcesCloser;
/** Time to wait after cancellation and interruption before releasing network resources. */
private final long taskCancellationTimeout;

Expand All @@ -1601,13 +1604,11 @@ private static class TaskCanceler implements Runnable {

TaskCanceler(
Logger logger,
Runnable networkResourcesCloser,
long taskCancellationTimeout,
AbstractInvokable invokable,
Thread executer,
String taskName) {
this.logger = logger;
this.networkResourcesCloser = networkResourcesCloser;
this.taskCancellationTimeout = taskCancellationTimeout;
this.invokable = invokable;
this.executer = executer;
Expand Down Expand Up @@ -1640,7 +1641,12 @@ public void run() {
// in order to unblock async Threads, which produce/consume the
// intermediate streams outside of the main Task Thread (like
// the Kafka consumer).
networkResourcesCloser.run();
// Notes: 1) This does not mean to release all network resources,
// the task thread itself will release them; 2) We can not close
// ResultPartitions here because of possible race conditions with
// Task thread so we just call the fail here.
failAllResultPartitions();
closeAllInputGates();

} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,50 +343,37 @@ private void testAddOnPartition(final ResultPartitionType partitionType) throws
}
}

@Test
public void testReleaseMemoryOnPipelinedPartition() throws Exception {
testReleaseMemory(ResultPartitionType.PIPELINED);
}

/**
* Tests {@link ResultPartition#releaseMemory(int)} on a working partition.
*
* @param resultPartitionType the result partition type to set up
* Tests {@link ResultPartition#close()} and {@link ResultPartition#release()} on a working
* pipelined partition.
*/
private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
@Test
public void testReleaseMemoryOnPipelinedPartition() throws Exception {
final int numAllBuffers = 10;
final NettyShuffleEnvironment network =
new NettyShuffleEnvironmentBuilder()
.setNumNetworkBuffers(numAllBuffers)
.setBufferSize(bufferSize)
.build();
final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
final ResultPartition resultPartition =
createPartition(network, ResultPartitionType.PIPELINED, 1);
try {
resultPartition.setup();

// take all buffers (more than the minimum required)
for (int i = 0; i < numAllBuffers; ++i) {
resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
resultPartition.emitRecord(ByteBuffer.allocate(bufferSize - 1), 0);
}
resultPartition.finish();

assertEquals(0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());

// reset the pool size less than the number of requested buffers
final int numLocalBuffers = 4;
resultPartition.getBufferPool().setNumBuffers(numLocalBuffers);
resultPartition.close();
assertTrue(resultPartition.getBufferPool().isDestroyed());
assertEquals(
numAllBuffers, network.getNetworkBufferPool().getNumberOfUsedMemorySegments());

// partition with blocking type should release excess buffers
if (!resultPartitionType.hasBackPressure()) {
assertEquals(
numLocalBuffers,
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
} else {
assertEquals(
0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
}
} finally {
resultPartition.release();
assertEquals(0, network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
} finally {
network.close();
}
}
Expand Down

0 comments on commit 08f98b7

Please sign in to comment.