diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java index c5fa945edde4d..265435d9603de 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java @@ -23,8 +23,8 @@ /** The provider used for requesting and releasing batch of memory segments. */ public interface MemorySegmentProvider { - Collection requestMemorySegments(int numberOfSegmentsToRequest) + Collection requestUnpooledMemorySegments(int numberOfSegmentsToRequest) throws IOException; - void recycleMemorySegments(Collection segments) throws IOException; + void recycleUnpooledMemorySegments(Collection segments) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 7d2003c7fd0e3..b08bcea76b195 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -244,7 +244,7 @@ public void reserveSegments(int numberOfSegmentsToReserve) throws IOException { if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) { availableMemorySegments.addAll( - networkBufferPool.requestMemorySegmentsBlocking( + networkBufferPool.requestPooledMemorySegmentsBlocking( numberOfSegmentsToReserve - numberOfRequestedMemorySegments)); toNotify = availabilityHelper.getUnavailableToResetAvailable(); } @@ -408,7 +408,7 @@ private boolean requestMemorySegmentFromGlobal() { !isDestroyed, "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); - MemorySegment segment = networkBufferPool.requestMemorySegment(); + MemorySegment segment = networkBufferPool.requestPooledMemorySegment(); if (segment != null) { availableMemorySegments.add(segment); numberOfRequestedMemorySegments++; @@ -652,7 +652,7 @@ private void returnMemorySegment(MemorySegment segment) { assert Thread.holdsLock(availableMemorySegments); numberOfRequestedMemorySegments--; - networkBufferPool.recycle(segment); + networkBufferPool.recyclePooledMemorySegment(segment); } private void returnExcessMemorySegments() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 945e37f03f7e7..509db03b5e7ba 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; @@ -149,27 +150,47 @@ public NetworkBufferPool( segmentSize); } + /** + * Different from {@link #requestUnpooledMemorySegments} for unpooled segments allocation. This + * method and the below {@link #requestPooledMemorySegmentsBlocking} method are designed to be + * used from {@link LocalBufferPool} for pooled memory segments allocation. Note that these + * methods for pooled memory segments requesting and recycling are prohibited from acquiring the + * factoryLock to avoid deadlock. + */ @Nullable - public MemorySegment requestMemorySegment() { + public MemorySegment requestPooledMemorySegment() { synchronized (availableMemorySegments) { return internalRequestMemorySegment(); } } - public List requestMemorySegmentsBlocking(int numberOfSegmentsToRequest) + public List requestPooledMemorySegmentsBlocking(int numberOfSegmentsToRequest) throws IOException { return internalRequestMemorySegments(numberOfSegmentsToRequest); } - public void recycle(MemorySegment segment) { + /** + * Corresponding to {@link #requestPooledMemorySegmentsBlocking} and {@link + * #requestPooledMemorySegment}, this method is for pooled memory segments recycling. + */ + public void recyclePooledMemorySegment(MemorySegment segment) { // Adds the segment back to the queue, which does not immediately free the memory // however, since this happens when references to the global pool are also released, // making the availableMemorySegments queue and its contained object reclaimable internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment))); } + /** + * Unpooled memory segments are requested directly from {@link NetworkBufferPool}, as opposed to + * pooled segments, that are requested through {@link BufferPool} that was created from this + * {@link NetworkBufferPool} (see {@link #createBufferPool}). They are used for example for + * exclusive {@link RemoteInputChannel} credits, that are permanently assigned to that channel, + * and never returned to any {@link BufferPool}. As opposed to pooled segments, when requested, + * unpooled segments needs to be accounted against {@link #numTotalRequiredBuffers}, which might + * require redistribution of the segments. + */ @Override - public List requestMemorySegments(int numberOfSegmentsToRequest) + public List requestUnpooledMemorySegments(int numberOfSegmentsToRequest) throws IOException { checkArgument( numberOfSegmentsToRequest >= 0, @@ -245,8 +266,12 @@ private MemorySegment internalRequestMemorySegment() { return segment; } + /** + * Corresponding to {@link #requestUnpooledMemorySegments}, this method is for unpooled memory + * segments recycling. + */ @Override - public void recycleMemorySegments(Collection segments) { + public void recycleUnpooledMemorySegments(Collection segments) { recycleMemorySegments(segments, segments.size()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java index 41eb12fe4c5e9..db38025def9e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java @@ -139,7 +139,8 @@ void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException { return; } - Collection segments = globalPool.requestMemorySegments(numExclusiveBuffers); + Collection segments = + globalPool.requestUnpooledMemorySegments(numExclusiveBuffers); synchronized (bufferQueue) { // AvailableBufferQueue::addExclusiveBuffer may release the previously allocated // floating buffer, which requires the caller to recycle these released floating @@ -213,7 +214,7 @@ public void recycle(MemorySegment segment) { // Similar to notifyBufferAvailable(), make sure that we never add a buffer // after channel released all buffers via releaseAllResources(). if (inputChannel.isReleased()) { - globalPool.recycleMemorySegments(Collections.singletonList(segment)); + globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); return; } else { releasedFloatingBuffer = @@ -280,7 +281,7 @@ void releaseAllBuffers(ArrayDeque buffers) throws IOException { } try { if (exclusiveRecyclingSegments.size() > 0) { - globalPool.recycleMemorySegments(exclusiveRecyclingSegments); + globalPool.recycleUnpooledMemorySegments(exclusiveRecyclingSegments); } } catch (Exception e) { err = firstOrSuppressed(e, err); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java index a305cfa25e341..19bd2e7eae3e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java @@ -317,7 +317,7 @@ public void testUniformDistributionBounded4() throws IOException { BufferPool first = globalPool.createBufferPool(1, 10); assertEquals(10, first.getNumBuffers()); - List segmentList1 = globalPool.requestMemorySegments(2); + List segmentList1 = globalPool.requestUnpooledMemorySegments(2); assertEquals(2, segmentList1.size()); assertEquals(8, first.getNumBuffers()); @@ -325,12 +325,12 @@ public void testUniformDistributionBounded4() throws IOException { assertEquals(4, first.getNumBuffers()); assertEquals(4, second.getNumBuffers()); - List segmentList2 = globalPool.requestMemorySegments(2); + List segmentList2 = globalPool.requestUnpooledMemorySegments(2); assertEquals(2, segmentList2.size()); assertEquals(3, first.getNumBuffers()); assertEquals(3, second.getNumBuffers()); - List segmentList3 = globalPool.requestMemorySegments(2); + List segmentList3 = globalPool.requestUnpooledMemorySegments(2); assertEquals(2, segmentList3.size()); assertEquals(2, first.getNumBuffers()); assertEquals(2, second.getNumBuffers()); @@ -339,17 +339,17 @@ public void testUniformDistributionBounded4() throws IOException { "Wrong number of available segments after creating buffer pools and requesting segments."; assertEquals(msg, 2, globalPool.getNumberOfAvailableMemorySegments()); - globalPool.recycleMemorySegments(segmentList1); + globalPool.recycleUnpooledMemorySegments(segmentList1); assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); assertEquals(3, first.getNumBuffers()); assertEquals(3, second.getNumBuffers()); - globalPool.recycleMemorySegments(segmentList2); + globalPool.recycleUnpooledMemorySegments(segmentList2); assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); assertEquals(4, first.getNumBuffers()); assertEquals(4, second.getNumBuffers()); - globalPool.recycleMemorySegments(segmentList3); + globalPool.recycleUnpooledMemorySegments(segmentList3); assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); assertEquals(5, first.getNumBuffers()); assertEquals(5, second.getNumBuffers()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index ff72fbca29a00..f77275ae53465 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -547,11 +547,11 @@ public TestNetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { @Nullable @Override - public MemorySegment requestMemorySegment() { + public MemorySegment requestPooledMemorySegment() { if (requestCounter++ == 1) { return null; } - return super.requestMemorySegment(); + return super.requestPooledMemorySegment(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index c0e9b9075c70d..809b5d0356075 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -129,7 +129,7 @@ public void testMemoryUsageInTheContextOfMemorySegmentAllocation() { NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize); - MemorySegment segment = globalPool.requestMemorySegment(); + MemorySegment segment = globalPool.requestPooledMemorySegment(); assertThat(segment, is(notNullValue())); assertThat(globalPool.getTotalNumberOfMemorySegments(), is(numBuffers)); @@ -245,8 +245,8 @@ public void testDestroyAll() throws IOException { } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} - * currently containing the number of required free segments. + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the {@link + * NetworkBufferPool} currently containing the number of required free segments. */ @Test public void testRequestMemorySegmentsLessThanTotalBuffers() throws IOException { @@ -256,21 +256,21 @@ public void testRequestMemorySegmentsLessThanTotalBuffers() throws IOException { List memorySegments = Collections.emptyList(); try { - memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + memorySegments = globalPool.requestUnpooledMemorySegments(numBuffers / 2); assertEquals(memorySegments.size(), numBuffers / 2); - globalPool.recycleMemorySegments(memorySegments); + globalPool.recycleUnpooledMemorySegments(memorySegments); memorySegments.clear(); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } finally { - globalPool.recycleMemorySegments(memorySegments); // just in case + globalPool.recycleUnpooledMemorySegments(memorySegments); // just in case globalPool.destroy(); } } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required - * buffers exceeding the capacity of {@link NetworkBufferPool}. + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the number of + * required buffers exceeding the capacity of {@link NetworkBufferPool}. */ @Test public void testRequestMemorySegmentsMoreThanTotalBuffers() { @@ -279,7 +279,7 @@ public void testRequestMemorySegmentsMoreThanTotalBuffers() { NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); try { - globalPool.requestMemorySegments(numBuffers + 1); + globalPool.requestUnpooledMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); @@ -289,22 +289,22 @@ public void testRequestMemorySegmentsMoreThanTotalBuffers() { } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to cause - * exception. + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument + * to cause exception. */ @Test(expected = IllegalArgumentException.class) public void testRequestMemorySegmentsWithInvalidArgument() throws IOException { NetworkBufferPool globalPool = new NetworkBufferPool(10, 128); // the number of requested buffers should be non-negative - globalPool.requestMemorySegments(-1); + globalPool.requestUnpooledMemorySegments(-1); globalPool.destroy(); fail("Should throw an IllegalArgumentException"); } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} - * currently not containing the number of required free segments (currently occupied by a buffer - * pool). + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the {@link + * NetworkBufferPool} currently not containing the number of required free segments (currently + * occupied by a buffer pool). */ @Test public void testRequestMemorySegmentsWithBuffersTaken() @@ -347,7 +347,7 @@ public void testRequestMemorySegmentsWithBuffersTaken() // take more buffers than are freely available at the moment via requestMemorySegments() isRunning.await(); - memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2); + memorySegments = networkBufferPool.requestUnpooledMemorySegments(numBuffers / 2); assertThat(memorySegments, not(hasItem(nullValue()))); } finally { if (bufferRecycler != null) { @@ -356,21 +356,21 @@ public void testRequestMemorySegmentsWithBuffersTaken() if (lbp1 != null) { lbp1.lazyDestroy(); } - networkBufferPool.recycleMemorySegments(memorySegments); + networkBufferPool.recycleUnpooledMemorySegments(memorySegments); networkBufferPool.destroy(); } } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in - * case of a concurrent {@link NetworkBufferPool#destroy()} call. + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, verifying it may be + * aborted in case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); - MemorySegment segment = globalPool.requestMemorySegment(); + MemorySegment segment = globalPool.requestPooledMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); @@ -379,7 +379,7 @@ public void testRequestMemorySegmentsInterruptable() throws Exception { @Override public void go() throws IOException { isRunning.trigger(); - globalPool.requestMemorySegments(10); + globalPool.requestUnpooledMemorySegments(10); } }; asyncRequest.start(); @@ -403,15 +403,15 @@ public void go() throws IOException { } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and - * remains in a defined state even if the waiting is interrupted. + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, verifying it may be + * aborted and remains in a defined state even if the waiting is interrupted. */ @Test public void testRequestMemorySegmentsInterruptable2() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); - MemorySegment segment = globalPool.requestMemorySegment(); + MemorySegment segment = globalPool.requestPooledMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); @@ -420,7 +420,7 @@ public void testRequestMemorySegmentsInterruptable2() throws Exception { @Override public void go() throws IOException { isRunning.trigger(); - globalPool.requestMemorySegments(10); + globalPool.requestUnpooledMemorySegments(10); } }; asyncRequest.start(); @@ -432,7 +432,7 @@ public void go() throws IOException { Thread.sleep(10); asyncRequest.interrupt(); - globalPool.recycle(segment); + globalPool.recyclePooledMemorySegment(segment); try { asyncRequest.sync(); @@ -448,7 +448,7 @@ public void go() throws IOException { } /** - * Tests {@link NetworkBufferPool#requestMemorySegments(int)} and verifies it will end + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} and verifies it will end * exceptionally when failing to acquire all the segments in the specific timeout. */ @Test @@ -471,7 +471,7 @@ public void testRequestMemorySegmentsTimeout() throws Exception { new CheckedThread() { @Override public void go() throws Exception { - globalPool.requestMemorySegments(numberOfSegmentsToRequest); + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); } }; @@ -490,8 +490,8 @@ public void go() throws Exception { /** * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is * correctly maintained after memory segments are requested by {@link - * NetworkBufferPool#requestMemorySegment()} and recycled by {@link - * NetworkBufferPool#recycle(MemorySegment)}. + * NetworkBufferPool#requestPooledMemorySegment()} and recycled by {@link + * NetworkBufferPool#recyclePooledMemorySegment(MemorySegment)}. */ @Test public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() { @@ -504,22 +504,22 @@ public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() { assertTrue(globalPool.getAvailableFuture().isDone()); // request the first segment - final MemorySegment segment1 = checkNotNull(globalPool.requestMemorySegment()); + final MemorySegment segment1 = checkNotNull(globalPool.requestPooledMemorySegment()); assertTrue(globalPool.getAvailableFuture().isDone()); // request the second segment - final MemorySegment segment2 = checkNotNull(globalPool.requestMemorySegment()); + final MemorySegment segment2 = checkNotNull(globalPool.requestPooledMemorySegment()); assertFalse(globalPool.getAvailableFuture().isDone()); final CompletableFuture availableFuture = globalPool.getAvailableFuture(); // recycle the first segment - globalPool.recycle(segment1); + globalPool.recyclePooledMemorySegment(segment1); assertTrue(availableFuture.isDone()); assertTrue(globalPool.getAvailableFuture().isDone()); // recycle the second segment - globalPool.recycle(segment2); + globalPool.recyclePooledMemorySegment(segment2); assertTrue(globalPool.getAvailableFuture().isDone()); } finally { @@ -530,8 +530,8 @@ public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() { /** * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is * correctly maintained after memory segments are requested by {@link - * NetworkBufferPool#requestMemorySegments(int)} and recycled by {@link - * NetworkBufferPool#recycleMemorySegments(Collection)}. + * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link + * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}. */ @Test(timeout = 10000L) public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() @@ -547,13 +547,13 @@ public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() // request 5 segments List segments1 = - globalPool.requestMemorySegments(numberOfSegmentsToRequest); + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); assertTrue(globalPool.getAvailableFuture().isDone()); assertEquals(numberOfSegmentsToRequest, segments1.size()); // request another 5 segments List segments2 = - globalPool.requestMemorySegments(numberOfSegmentsToRequest); + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); assertFalse(globalPool.getAvailableFuture().isDone()); assertEquals(numberOfSegmentsToRequest, segments2.size()); @@ -566,7 +566,8 @@ public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() public void go() throws Exception { // this request should be blocked until at least 5 segments are recycled segments3.addAll( - globalPool.requestMemorySegments(numberOfSegmentsToRequest)); + globalPool.requestUnpooledMemorySegments( + numberOfSegmentsToRequest)); latch.countDown(); } }; @@ -574,7 +575,7 @@ public void go() throws Exception { // recycle 5 segments CompletableFuture availableFuture = globalPool.getAvailableFuture(); - globalPool.recycleMemorySegments(segments1); + globalPool.recycleUnpooledMemorySegments(segments1); assertTrue(availableFuture.isDone()); // wait util the third request is fulfilled @@ -583,11 +584,11 @@ public void go() throws Exception { assertEquals(numberOfSegmentsToRequest, segments3.size()); // recycle another 5 segments - globalPool.recycleMemorySegments(segments2); + globalPool.recycleUnpooledMemorySegments(segments2); assertTrue(globalPool.getAvailableFuture().isDone()); // recycle the last 5 segments - globalPool.recycleMemorySegments(segments3); + globalPool.recycleUnpooledMemorySegments(segments3); assertTrue(globalPool.getAvailableFuture().isDone()); } finally { @@ -624,10 +625,10 @@ public void testBlockingRequestFromMultiLocalBufferPool() // request some segments from the global pool in two different ways final List segments = new ArrayList<>(numberOfSegmentsToRequest - 1); for (int i = 0; i < numberOfSegmentsToRequest - 1; ++i) { - segments.add(globalPool.requestMemorySegment()); + segments.add(globalPool.requestPooledMemorySegment()); } final List exclusiveSegments = - globalPool.requestMemorySegments( + globalPool.requestUnpooledMemorySegments( globalPool.getNumberOfAvailableMemorySegments() - 1); assertTrue(globalPool.getAvailableFuture().isDone()); for (final BufferPool localPool : localBufferPools) { @@ -674,9 +675,9 @@ public void testBlockingRequestFromMultiLocalBufferPool() // recycle the previously requested segments for (MemorySegment segment : segments) { - globalPool.recycle(segment); + globalPool.recyclePooledMemorySegment(segment); } - globalPool.recycleMemorySegments(exclusiveSegments); + globalPool.recycleUnpooledMemorySegments(exclusiveSegments); assertTrue(globalPoolAvailableFuture.isDone()); for (CompletableFuture localPoolAvailableFuture : localPoolAvailableFutures) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index 8abcdd167572d..9d336cbaf1515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -254,12 +254,13 @@ public static MemorySegmentProvider getInstance() { private StubMemorySegmentProvider() {} @Override - public Collection requestMemorySegments(int numberOfSegmentsToRequest) { + public Collection requestUnpooledMemorySegments( + int numberOfSegmentsToRequest) { return Collections.emptyList(); } @Override - public void recycleMemorySegments(Collection segments) {} + public void recycleUnpooledMemorySegments(Collection segments) {} } /** {@link MemorySegmentProvider} that provides unpooled {@link MemorySegment}s. */ @@ -271,12 +272,13 @@ public UnpooledMemorySegmentProvider(int pageSize) { } @Override - public Collection requestMemorySegments(int numberOfSegmentsToRequest) { + public Collection requestUnpooledMemorySegments( + int numberOfSegmentsToRequest) { return Collections.singletonList( MemorySegmentFactory.allocateUnpooledSegment(pageSize)); } @Override - public void recycleMemorySegments(Collection segments) {} + public void recycleUnpooledMemorySegments(Collection segments) {} } }