Skip to content

Commit

Permalink
[FLINK-25407][network] Fix the issues caused by FLINK-24035
Browse files Browse the repository at this point in the history
This PR tries to fix the issues caused by FLINK-24035. More specifically, there are two issues, the first one is the deadlock caused by acquiring the 'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of the required segments of NetworkBufferPool. Both issues occur during exception handling of requesting segments. Actually, when reserving memory segments for LocalBufferPool, there is no need to modify the value of required segments. As a result, there is no need to acquire the 'factoryLock'. This PR fixes the issues by removing the required segments decreasing logic together with the 'factoryLock' acquiring during exception handling of requesting segments in NetworkBufferPool.

This closes apache#18173.
  • Loading branch information
wsry authored and pnowojski committed Jan 13, 2022
1 parent 38f7c59 commit 1ea2a7a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,13 @@ public List<MemorySegment> requestUnpooledMemorySegments(int numberOfSegmentsToR
tryRedistributeBuffers(numberOfSegmentsToRequest);
}

return internalRequestMemorySegments(numberOfSegmentsToRequest);
try {
return internalRequestMemorySegments(numberOfSegmentsToRequest);
} catch (IOException exception) {
revertRequiredBuffers(numberOfSegmentsToRequest);
ExceptionUtils.rethrowIOException(exception);
return null;
}
}

private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest)
Expand Down Expand Up @@ -248,7 +254,7 @@ private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsTo
}
}
} catch (Throwable e) {
recycleMemorySegments(segments, numberOfSegmentsToRequest);
internalRecycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}

Expand All @@ -272,12 +278,11 @@ private MemorySegment internalRequestMemorySegment() {
*/
@Override
public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {
recycleMemorySegments(segments, segments.size());
}

private void recycleMemorySegments(Collection<MemorySegment> segments, int size) {
internalRecycleMemorySegments(segments);
revertRequiredBuffers(segments.size());
}

private void revertRequiredBuffers(int size) {
synchronized (factoryLock) {
numTotalRequiredBuffers -= size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,65 @@ public void testReserveSegments() throws Exception {
}
}

@Test(timeout = 10000) // timeout can indicate a potential deadlock
public void testReserveSegmentsAndCancel() throws Exception {
int totalSegments = 4;
int segmentsToReserve = 2;

NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, memorySegmentSize);
BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
List<MemorySegment> segments = new ArrayList<>();

try {
for (int i = 0; i < totalSegments; ++i) {
segments.add(localPool1.requestMemorySegmentBlocking());
}

BufferPool localPool2 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
// the segment reserve thread will be blocked for no buffer is available
Thread reserveThread =
new Thread(
() -> {
try {
localPool2.reserveSegments(segmentsToReserve);
} catch (Throwable ignored) {
}
});
reserveThread.start();
Thread.sleep(100); // wait to be blocked

// the cancel thread can be blocked when redistributing buffers
Thread cancelThread =
new Thread(
() -> {
localPool1.lazyDestroy();
localPool2.lazyDestroy();
});
cancelThread.start();

// it is expected that the segment reserve thread can be cancelled successfully
Thread interruptThread =
new Thread(
() -> {
try {
do {
reserveThread.interrupt();
Thread.sleep(100);
} while (reserveThread.isAlive() || cancelThread.isAlive());
} catch (Throwable ignored) {
}
});
interruptThread.start();

interruptThread.join();
} finally {
segments.forEach(localPool1::recycle);
localPool1.lazyDestroy();
assertEquals(0, globalPool.getNumberOfUsedMemorySegments());
globalPool.destroy();
}
}

@Test
public void testRequestMoreThanAvailable() {
localBufferPool.setNumBuffers(numBuffers);
Expand Down

0 comments on commit 1ea2a7a

Please sign in to comment.