Skip to content

Commit

Permalink
De-duplicate PooledByteBuf implementations (netty#9120)
Browse files Browse the repository at this point in the history
Motivation

There's quite a lot of duplicate/equivalent logic across the various
concrete ByteBuf implementations. We could take this even further but
for now I've focused on the PooledByteBuf sub-hierarchy.

Modifications

- Move common logic/methods into existing PooledByteBuf abstract
superclass
- Shorten PooledByteBuf.capacity(int) method implementation

Result

Less code to maintain
  • Loading branch information
njhill authored and normanmaurer committed Jun 19, 2019
1 parent c32c9b4 commit 6381d07
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 386 deletions.
7 changes: 7 additions & 0 deletions buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,13 @@ protected final void checkDstIndex(int index, int length, int dstIndex, int dstC
}
}

protected final void checkDstIndex(int length, int dstIndex, int dstCapacity) {
checkReadableBytes(length);
if (checkBounds) {
checkRangeBounds("dstIndex", dstIndex, length, dstCapacity);
}
}

/**
* Throws an {@link IndexOutOfBoundsException} if the current
* {@linkplain #readableBytes() readable bytes} of this buffer is less
Expand Down
115 changes: 93 additions & 22 deletions buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

Expand Down Expand Up @@ -89,34 +94,23 @@ public int maxFastWritableBytes() {

@Override
public final ByteBuf capacity(int newCapacity) {
if (newCapacity == length) {
ensureAccessible();
return this;
}
checkNewCapacity(newCapacity);

// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) {
if (newCapacity == length) {
return this;
}
} else {
if (!chunk.unpooled) {
// If the request capacity does not require reallocation, just update the length of the memory.
if (newCapacity > length) {
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity < length) {
if (newCapacity > maxLength >>> 1) {
if (maxLength <= 512) {
if (newCapacity > maxLength - 16) {
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
} else if (newCapacity > maxLength >>> 1 &&
(maxLength > 512 || newCapacity > maxLength - 16)) {
// here newCapacity < length
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
Expand Down Expand Up @@ -187,4 +181,81 @@ private void recycle() {
protected final int idx(int index) {
return offset + index;
}

final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
index = idx(index);
ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
buffer.limit(index + length).position(index);
return buffer;
}

ByteBuffer duplicateInternalNioBuffer(int index, int length) {
checkIndex(index, length);
return _internalNioBuffer(index, length, true);
}

@Override
public final ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return _internalNioBuffer(index, length, false);
}

@Override
public final int nioBufferCount() {
return 1;
}

@Override
public final ByteBuffer nioBuffer(int index, int length) {
return duplicateInternalNioBuffer(index, length).slice();
}

@Override
public final ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[] { nioBuffer(index, length) };
}

@Override
public final int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
return out.write(duplicateInternalNioBuffer(index, length));
}

@Override
public final int readBytes(GatheringByteChannel out, int length) throws IOException {
checkReadableBytes(length);
int readBytes = out.write(_internalNioBuffer(readerIndex, length, false));
readerIndex += readBytes;
return readBytes;
}

@Override
public final int getBytes(int index, FileChannel out, long position, int length) throws IOException {
return out.write(duplicateInternalNioBuffer(index, length), position);
}

@Override
public final int readBytes(FileChannel out, long position, int length) throws IOException {
checkReadableBytes(length);
int readBytes = out.write(_internalNioBuffer(readerIndex, length, false), position);
readerIndex += readBytes;
return readBytes;
}

@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
return in.read(internalNioBuffer(index, length));
} catch (ClosedChannelException ignored) {
return -1;
}
}

@Override
public final int setBytes(int index, FileChannel in, long position, int length) throws IOException {
try {
return in.read(internalNioBuffer(index, length), position);
} catch (ClosedChannelException ignored) {
return -1;
}
}
}
159 changes: 11 additions & 148 deletions buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;

final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

Expand Down Expand Up @@ -126,55 +122,30 @@ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {

@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
getBytes(index, dst, dstIndex, length, false);
return this;
}

private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) {
checkDstIndex(index, length, dstIndex, dst.length);
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.get(dst, dstIndex, length);
_internalNioBuffer(index, length, true).get(dst, dstIndex, length);
return this;
}

@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length, true);
checkDstIndex(length, dstIndex, dst.length);
_internalNioBuffer(readerIndex, length, false).get(dst, dstIndex, length);
readerIndex += length;
return this;
}

@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
getBytes(index, dst, false);
dst.put(duplicateInternalNioBuffer(index, dst.remaining()));
return this;
}

private void getBytes(int index, ByteBuffer dst, boolean internal) {
checkIndex(index, dst.remaining());
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + dst.remaining());
dst.put(tmpBuf);
}

@Override
public ByteBuf readBytes(ByteBuffer dst) {
int length = dst.remaining();
checkReadableBytes(length);
getBytes(readerIndex, dst, true);
dst.put(_internalNioBuffer(readerIndex, length, false));
readerIndex += length;
return this;
}
Expand All @@ -201,61 +172,6 @@ public ByteBuf readBytes(OutputStream out, int length) throws IOException {
return this;
}

@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
return getBytes(index, out, length, false);
}

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
checkIndex(index, length);
if (length == 0) {
return 0;
}

ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
return out.write(tmpBuf);
}

@Override
public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
return getBytes(index, out, position, length, false);
}

private int getBytes(int index, FileChannel out, long position, int length, boolean internal) throws IOException {
checkIndex(index, length);
if (length == 0) {
return 0;
}

ByteBuffer tmpBuf = internal ? internalNioBuffer() : memory.duplicate();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
return out.write(tmpBuf, position);
}

@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
return readBytes;
}

@Override
public int readBytes(FileChannel out, long position, int length) throws IOException {
checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, position, length, true);
readerIndex += readBytes;
return readBytes;
}

@Override
protected void _setByte(int index, int value) {
memory.put(idx(index), (byte) value);
Expand Down Expand Up @@ -327,23 +243,21 @@ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.put(src, srcIndex, length);
_internalNioBuffer(index, length, false).put(src, srcIndex, length);
return this;
}

@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
checkIndex(index, src.remaining());
int length = src.remaining();
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
if (src == tmpBuf) {
src = src.duplicate();
}

index = idx(index);
tmpBuf.clear().position(index).limit(index + src.remaining());
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.put(src);
return this;
}
Expand All @@ -362,62 +276,11 @@ public int setBytes(int index, InputStream in, int length) throws IOException {
return readBytes;
}

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
return -1;
}
}

@Override
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpBuf, position);
} catch (ClosedChannelException ignored) {
return -1;
}
}

@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}

@Override
public int nioBufferCount() {
return 1;
}

@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
index = idx(index);
return ((ByteBuffer) memory.duplicate().position(index).limit(index + length)).slice();
}

@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[] { nioBuffer(index, length) };
}

@Override
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
index = idx(index);
return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
return copy.writeBytes(this, index, length);
}

@Override
Expand Down
Loading

0 comments on commit 6381d07

Please sign in to comment.