Skip to content

Commit

Permalink
[hotfix] Rename some methods of NetworkBufferPool and add more commen…
Browse files Browse the repository at this point in the history
…ts for better readability

This closes apache#18173.
  • Loading branch information
wsry authored and pnowojski committed Jan 13, 2022
1 parent f957e3f commit 38f7c59
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

/** The provider used for requesting and releasing batch of memory segments. */
public interface MemorySegmentProvider {
Collection<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest)
Collection<MemorySegment> requestUnpooledMemorySegments(int numberOfSegmentsToRequest)
throws IOException;

void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException;
void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void reserveSegments(int numberOfSegmentsToReserve) throws IOException {

if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
availableMemorySegments.addAll(
networkBufferPool.requestMemorySegmentsBlocking(
networkBufferPool.requestPooledMemorySegmentsBlocking(
numberOfSegmentsToReserve - numberOfRequestedMemorySegments));
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
Expand Down Expand Up @@ -408,7 +408,7 @@ private boolean requestMemorySegmentFromGlobal() {
!isDestroyed,
"Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");

MemorySegment segment = networkBufferPool.requestMemorySegment();
MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
if (segment != null) {
availableMemorySegments.add(segment);
numberOfRequestedMemorySegments++;
Expand Down Expand Up @@ -652,7 +652,7 @@ private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);

numberOfRequestedMemorySegments--;
networkBufferPool.recycle(segment);
networkBufferPool.recyclePooledMemorySegment(segment);
}

private void returnExcessMemorySegments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -149,27 +150,47 @@ public NetworkBufferPool(
segmentSize);
}

/**
* Different from {@link #requestUnpooledMemorySegments} for unpooled segments allocation. This
* method and the below {@link #requestPooledMemorySegmentsBlocking} method are designed to be
* used from {@link LocalBufferPool} for pooled memory segments allocation. Note that these
* methods for pooled memory segments requesting and recycling are prohibited from acquiring the
* factoryLock to avoid deadlock.
*/
@Nullable
public MemorySegment requestMemorySegment() {
public MemorySegment requestPooledMemorySegment() {
synchronized (availableMemorySegments) {
return internalRequestMemorySegment();
}
}

public List<MemorySegment> requestMemorySegmentsBlocking(int numberOfSegmentsToRequest)
public List<MemorySegment> requestPooledMemorySegmentsBlocking(int numberOfSegmentsToRequest)
throws IOException {
return internalRequestMemorySegments(numberOfSegmentsToRequest);
}

public void recycle(MemorySegment segment) {
/**
* Corresponding to {@link #requestPooledMemorySegmentsBlocking} and {@link
* #requestPooledMemorySegment}, this method is for pooled memory segments recycling.
*/
public void recyclePooledMemorySegment(MemorySegment segment) {
// Adds the segment back to the queue, which does not immediately free the memory
// however, since this happens when references to the global pool are also released,
// making the availableMemorySegments queue and its contained object reclaimable
internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
}

/**
* Unpooled memory segments are requested directly from {@link NetworkBufferPool}, as opposed to
* pooled segments, that are requested through {@link BufferPool} that was created from this
* {@link NetworkBufferPool} (see {@link #createBufferPool}). They are used for example for
* exclusive {@link RemoteInputChannel} credits, that are permanently assigned to that channel,
* and never returned to any {@link BufferPool}. As opposed to pooled segments, when requested,
* unpooled segments needs to be accounted against {@link #numTotalRequiredBuffers}, which might
* require redistribution of the segments.
*/
@Override
public List<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest)
public List<MemorySegment> requestUnpooledMemorySegments(int numberOfSegmentsToRequest)
throws IOException {
checkArgument(
numberOfSegmentsToRequest >= 0,
Expand Down Expand Up @@ -245,8 +266,12 @@ private MemorySegment internalRequestMemorySegment() {
return segment;
}

/**
* Corresponding to {@link #requestUnpooledMemorySegments}, this method is for unpooled memory
* segments recycling.
*/
@Override
public void recycleMemorySegments(Collection<MemorySegment> segments) {
public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {
recycleMemorySegments(segments, segments.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
return;
}

Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
Collection<MemorySegment> segments =
globalPool.requestUnpooledMemorySegments(numExclusiveBuffers);
synchronized (bufferQueue) {
// AvailableBufferQueue::addExclusiveBuffer may release the previously allocated
// floating buffer, which requires the caller to recycle these released floating
Expand Down Expand Up @@ -213,7 +214,7 @@ public void recycle(MemorySegment segment) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after channel released all buffers via releaseAllResources().
if (inputChannel.isReleased()) {
globalPool.recycleMemorySegments(Collections.singletonList(segment));
globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
return;
} else {
releasedFloatingBuffer =
Expand Down Expand Up @@ -280,7 +281,7 @@ void releaseAllBuffers(ArrayDeque<Buffer> buffers) throws IOException {
}
try {
if (exclusiveRecyclingSegments.size() > 0) {
globalPool.recycleMemorySegments(exclusiveRecyclingSegments);
globalPool.recycleUnpooledMemorySegments(exclusiveRecyclingSegments);
}
} catch (Exception e) {
err = firstOrSuppressed(e, err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,20 @@ public void testUniformDistributionBounded4() throws IOException {
BufferPool first = globalPool.createBufferPool(1, 10);
assertEquals(10, first.getNumBuffers());

List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
List<MemorySegment> segmentList1 = globalPool.requestUnpooledMemorySegments(2);
assertEquals(2, segmentList1.size());
assertEquals(8, first.getNumBuffers());

BufferPool second = globalPool.createBufferPool(1, 10);
assertEquals(4, first.getNumBuffers());
assertEquals(4, second.getNumBuffers());

List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
List<MemorySegment> segmentList2 = globalPool.requestUnpooledMemorySegments(2);
assertEquals(2, segmentList2.size());
assertEquals(3, first.getNumBuffers());
assertEquals(3, second.getNumBuffers());

List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
List<MemorySegment> segmentList3 = globalPool.requestUnpooledMemorySegments(2);
assertEquals(2, segmentList3.size());
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
Expand All @@ -339,17 +339,17 @@ public void testUniformDistributionBounded4() throws IOException {
"Wrong number of available segments after creating buffer pools and requesting segments.";
assertEquals(msg, 2, globalPool.getNumberOfAvailableMemorySegments());

globalPool.recycleMemorySegments(segmentList1);
globalPool.recycleUnpooledMemorySegments(segmentList1);
assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
assertEquals(3, first.getNumBuffers());
assertEquals(3, second.getNumBuffers());

globalPool.recycleMemorySegments(segmentList2);
globalPool.recycleUnpooledMemorySegments(segmentList2);
assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments());
assertEquals(4, first.getNumBuffers());
assertEquals(4, second.getNumBuffers());

globalPool.recycleMemorySegments(segmentList3);
globalPool.recycleUnpooledMemorySegments(segmentList3);
assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments());
assertEquals(5, first.getNumBuffers());
assertEquals(5, second.getNumBuffers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,11 +547,11 @@ public TestNetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

@Nullable
@Override
public MemorySegment requestMemorySegment() {
public MemorySegment requestPooledMemorySegment() {
if (requestCounter++ == 1) {
return null;
}
return super.requestMemorySegment();
return super.requestPooledMemorySegment();
}
}
}
Loading

0 comments on commit 38f7c59

Please sign in to comment.