Skip to content

Commit

Permalink
[netty#1812] Allow for inline for most common cases when use NioByteU…
Browse files Browse the repository at this point in the history
…nsafe.read()
  • Loading branch information
Norman Maurer committed Oct 25, 2013
1 parent e4358ae commit 544d68b
Showing 1 changed file with 52 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,88 +55,80 @@ protected AbstractNioUnsafe newUnsafe() {
private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;

@Override
public void read() {
assert eventLoop().inEventLoop();
final SelectionKey key = selectionKey();
final ChannelConfig config = config();
if (!config.isAutoRead()) {
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
private void removeReadOp() {
SelectionKey key = selectionKey();
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}

private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}

final ChannelPipeline pipeline = pipeline();
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}

@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}

final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();

boolean closed = false;
Throwable exception = null;
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
for (;;) {
do {
byteBuf = allocHandle.allocate(allocator);
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount == 0) {
byteBuf.release();
byteBuf = null;
if (localReadAmount <= 0) {
close = localReadAmount < 0;
break;
}
if (localReadAmount < 0) {
closed = true;
byteBuf.release();
byteBuf = null;
break;
}

pipeline.fireChannelRead(byteBuf);
allocHandle.record(localReadAmount);
byteBuf = null;
if (++ messages == maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
} finally {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.record(localReadAmount);
} while (++ messages < maxMessagesPerRead);

pipeline.fireChannelReadComplete();

if (exception != null) {
if (exception instanceof IOException) {
closed = true;
}

pipeline().fireExceptionCaught(exception);
}

if (closed) {
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
}
Expand Down

0 comments on commit 544d68b

Please sign in to comment.