Skip to content

Commit

Permalink
Allow ChunkedInput to provide the progress of its transfer
Browse files Browse the repository at this point in the history
Related issue: netty#2741 and netty#2151

Motivation:

There is no way for ChunkedWriteHandler to know the progress of the
transfer of a ChannelInput. Therefore, ChannelProgressiveFutureListener
cannot get exact information about the progress of the transfer.

If you add a few methods that optionally provides the transfer progress
to ChannelInput, it becomes possible for ChunkedWriteHandler to notify
ChannelProgressiveFutureListeners.

If the input has no definite length, we can still use the progress so
far, and consider the length of the input as 'undefined'.

Modifications:

- Add ChunkedInput.progress() and ChunkedInput.length()
- Modify ChunkedWriteHandler to use progress() and length() to notify
  the transfer progress

Result:

ChunkedWriteHandler now notifies ChannelProgressiveFutureListener.
  • Loading branch information
plucury authored and trustin committed Aug 14, 2014
1 parent fc1429c commit ca29be5
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
return new DefaultHttpContent(buf);
}
}

@Override
public long length() {
return input.length();
}

@Override
public long progress() {
return input.progress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,14 @@ public void cleanFiles() {
* While adding a FileUpload, is the multipart currently in Mixed Mode
*/
private boolean duringMixedMode;

/**
* Global Body size
*/
private long globalBodySize;
/**
* Global Transfer progress
*/
private long globalProgress;

/**
* True if this request is a Multipart request
Expand Down Expand Up @@ -997,7 +1000,9 @@ public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
if (isLastChunkSent) {
return null;
} else {
return nextChunk();
HttpContent nextChunk = nextChunk();
globalProgress += nextChunk.content().readableBytes();
return nextChunk;
}
}

Expand Down Expand Up @@ -1083,6 +1088,16 @@ public boolean isEndOfInput() throws Exception {
return isLastChunkSent;
}

@Override
public long length() {
return isMultipart? globalBodySize : globalBodySize - 1;
}

@Override
public long progress() {
return globalProgress;
}

/**
* Exception when an error occurs while encoding
*/
Expand Down
10 changes: 10 additions & 0 deletions handler/src/main/java/io/netty/handler/stream/ChunkedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
}
}
}

@Override
public long length() {
return endOffset - startOffset;
}

@Override
public long progress() {
return offset - startOffset;
}
}
12 changes: 12 additions & 0 deletions handler/src/main/java/io/netty/handler/stream/ChunkedInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ public interface ChunkedInput<B> {
*/
B readChunk(ChannelHandlerContext ctx) throws Exception;

/**
* Returns the length of the input.
* @return the length of the input if the length of the input is known.
* a negative value if the length of the input is unknown.
*/
long length();

/**
* Returns current transfer progress.
*/
long progress();

}
10 changes: 10 additions & 0 deletions handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
}
}
}

@Override
public long length() {
return endOffset - startOffset;
}

@Override
public long progress() {
return offset - startOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
}
}
}

@Override
public long length() {
return -1;
}

@Override
public long progress() {
return offset;
}
}
10 changes: 10 additions & 0 deletions handler/src/main/java/io/netty/handler/stream/ChunkedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,14 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
}
}
}

@Override
public long length() {
return -1;
}

@Override
public long progress() {
return offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void discard(Throwable cause) {
}
currentWrite.fail(cause);
} else {
currentWrite.success();
currentWrite.success(in.length());
}
closeInput(in);
} catch (Exception e) {
Expand Down Expand Up @@ -253,7 +253,6 @@ private void doFlush(final ChannelHandlerContext ctx) throws Exception {
message = Unpooled.EMPTY_BUFFER;
}

final int amount = amount(message);
ChannelFuture f = ctx.write(message);
if (endOfInput) {
this.currentWrite = null;
Expand All @@ -266,8 +265,8 @@ private void doFlush(final ChannelHandlerContext ctx) throws Exception {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
currentWrite.progress(amount);
currentWrite.success();
currentWrite.progress(chunks.progress(), chunks.length());
currentWrite.success(chunks.length());
closeInput(chunks);
}
});
Expand All @@ -279,7 +278,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(amount);
currentWrite.progress(chunks.progress(), chunks.length());
}
}
});
Expand All @@ -291,7 +290,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(amount);
currentWrite.progress(chunks.progress(), chunks.length());
if (channel.isWritable()) {
resumeTransfer();
}
Expand Down Expand Up @@ -327,7 +326,6 @@ static void closeInput(ChunkedInput<?> chunks) {
private static final class PendingWrite {
final Object msg;
final ChannelPromise promise;
private long progress;

PendingWrite(Object msg, ChannelPromise promise) {
this.msg = msg;
Expand All @@ -339,35 +337,24 @@ void fail(Throwable cause) {
promise.tryFailure(cause);
}

void success() {
void success(long total) {
if (promise.isDone()) {
// No need to notify the progress or fulfill the promise because it's done already.
return;
}

if (promise instanceof ChannelProgressivePromise) {
// Now we know what the total is.
((ChannelProgressivePromise) promise).tryProgress(progress, progress);
((ChannelProgressivePromise) promise).tryProgress(total, total);
}

promise.trySuccess();
}

void progress(int amount) {
progress += amount;
void progress(long progress, long total) {
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
((ChannelProgressivePromise) promise).tryProgress(progress, total);
}
}
}

private static int amount(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
done = true;
return buffer.duplicate().retain();
}

@Override
public long length() {
return -1;
}

@Override
public long progress() {
return 1;
}
};

final AtomicBoolean listenerNotified = new AtomicBoolean(false);
Expand Down Expand Up @@ -171,6 +181,16 @@ public Object readChunk(ChannelHandlerContext ctx) throws Exception {
done = true;
return 0;
}

@Override
public long length() {
return -1;
}

@Override
public long progress() {
return 1;
}
};

EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
Expand Down

0 comments on commit ca29be5

Please sign in to comment.