From ca29be5e77ef7999b2707cf5d4f32d5e306d568b Mon Sep 17 00:00:00 2001 From: plucury Date: Wed, 13 Aug 2014 22:52:24 +0800 Subject: [PATCH] Allow ChunkedInput to provide the progress of its transfer Related issue: #2741 and #2151 Motivation: There is no way for ChunkedWriteHandler to know the progress of the transfer of a ChannelInput. Therefore, ChannelProgressiveFutureListener cannot get exact information about the progress of the transfer. If you add a few methods that optionally provides the transfer progress to ChannelInput, it becomes possible for ChunkedWriteHandler to notify ChannelProgressiveFutureListeners. If the input has no definite length, we can still use the progress so far, and consider the length of the input as 'undefined'. Modifications: - Add ChunkedInput.progress() and ChunkedInput.length() - Modify ChunkedWriteHandler to use progress() and length() to notify the transfer progress Result: ChunkedWriteHandler now notifies ChannelProgressiveFutureListener. --- .../handler/codec/http/HttpChunkedInput.java | 10 ++++++ .../multipart/HttpPostRequestEncoder.java | 19 ++++++++++-- .../io/netty/handler/stream/ChunkedFile.java | 10 ++++++ .../io/netty/handler/stream/ChunkedInput.java | 12 +++++++ .../netty/handler/stream/ChunkedNioFile.java | 10 ++++++ .../handler/stream/ChunkedNioStream.java | 10 ++++++ .../netty/handler/stream/ChunkedStream.java | 10 ++++++ .../handler/stream/ChunkedWriteHandler.java | 31 ++++++------------- .../stream/ChunkedWriteHandlerTest.java | 20 ++++++++++++ 9 files changed, 108 insertions(+), 24 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java index a6d317a1ff7..652ba6ae6fa 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java @@ -96,4 +96,14 @@ public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { return new DefaultHttpContent(buf); } } + + @Override + public long length() { + return input.length(); + } + + @Override + public long progress() { + return input.progress(); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java index e984b5f5979..044acf449d4 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java @@ -244,11 +244,14 @@ public void cleanFiles() { * While adding a FileUpload, is the multipart currently in Mixed Mode */ private boolean duringMixedMode; - /** * Global Body size */ private long globalBodySize; + /** + * Global Transfer progress + */ + private long globalProgress; /** * True if this request is a Multipart request @@ -997,7 +1000,9 @@ public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { if (isLastChunkSent) { return null; } else { - return nextChunk(); + HttpContent nextChunk = nextChunk(); + globalProgress += nextChunk.content().readableBytes(); + return nextChunk; } } @@ -1083,6 +1088,16 @@ public boolean isEndOfInput() throws Exception { return isLastChunkSent; } + @Override + public long length() { + return isMultipart? globalBodySize : globalBodySize - 1; + } + + @Override + public long progress() { + return globalProgress; + } + /** * Exception when an error occurs while encoding */ diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java index aaa7c7bf960..69d1efb1419 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java @@ -161,4 +161,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java index fa6fd85322c..4c44bf9efc5 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java @@ -47,4 +47,16 @@ public interface ChunkedInput { */ B readChunk(ChannelHandlerContext ctx) throws Exception; + /** + * Returns the length of the input. + * @return the length of the input if the length of the input is known. + * a negative value if the length of the input is unknown. + */ + long length(); + + /** + * Returns current transfer progress. + */ + long progress(); + } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java index 6032644c090..dbb0521d4dc 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java @@ -172,4 +172,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java index f6dcc754baf..fd59e74477b 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java @@ -128,4 +128,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return offset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java index e50d4fbc575..bcaa6334537 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java @@ -120,4 +120,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return offset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 9f8ba6b1768..39bd22b109c 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -179,7 +179,7 @@ private void discard(Throwable cause) { } currentWrite.fail(cause); } else { - currentWrite.success(); + currentWrite.success(in.length()); } closeInput(in); } catch (Exception e) { @@ -253,7 +253,6 @@ private void doFlush(final ChannelHandlerContext ctx) throws Exception { message = Unpooled.EMPTY_BUFFER; } - final int amount = amount(message); ChannelFuture f = ctx.write(message); if (endOfInput) { this.currentWrite = null; @@ -266,8 +265,8 @@ private void doFlush(final ChannelHandlerContext ctx) throws Exception { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - currentWrite.progress(amount); - currentWrite.success(); + currentWrite.progress(chunks.progress(), chunks.length()); + currentWrite.success(chunks.length()); closeInput(chunks); } }); @@ -279,7 +278,7 @@ public void operationComplete(ChannelFuture future) throws Exception { closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(amount); + currentWrite.progress(chunks.progress(), chunks.length()); } } }); @@ -291,7 +290,7 @@ public void operationComplete(ChannelFuture future) throws Exception { closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(amount); + currentWrite.progress(chunks.progress(), chunks.length()); if (channel.isWritable()) { resumeTransfer(); } @@ -327,7 +326,6 @@ static void closeInput(ChunkedInput chunks) { private static final class PendingWrite { final Object msg; final ChannelPromise promise; - private long progress; PendingWrite(Object msg, ChannelPromise promise) { this.msg = msg; @@ -339,7 +337,7 @@ void fail(Throwable cause) { promise.tryFailure(cause); } - void success() { + void success(long total) { if (promise.isDone()) { // No need to notify the progress or fulfill the promise because it's done already. return; @@ -347,27 +345,16 @@ void success() { if (promise instanceof ChannelProgressivePromise) { // Now we know what the total is. - ((ChannelProgressivePromise) promise).tryProgress(progress, progress); + ((ChannelProgressivePromise) promise).tryProgress(total, total); } promise.trySuccess(); } - void progress(int amount) { - progress += amount; + void progress(long progress, long total) { if (promise instanceof ChannelProgressivePromise) { - ((ChannelProgressivePromise) promise).tryProgress(progress, -1); + ((ChannelProgressivePromise) promise).tryProgress(progress, total); } } } - - private static int amount(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); - } - if (msg instanceof ByteBufHolder) { - return ((ByteBufHolder) msg).content().readableBytes(); - } - return 1; - } } diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 54879929384..204e140f4b5 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -124,6 +124,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { done = true; return buffer.duplicate().retain(); } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } }; final AtomicBoolean listenerNotified = new AtomicBoolean(false); @@ -171,6 +181,16 @@ public Object readChunk(ChannelHandlerContext ctx) throws Exception { done = true; return 0; } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } }; EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());