Skip to content

Commit

Permalink
[Cronet] Fix BidirectionalStream.flush()
Browse files Browse the repository at this point in the history
If a flush() is delayed because there is a write pending, we will incorrectly
flush data in mPendingQueue as well. This leads to undeterministic behavior
in buffer coalescing. This CL makes delayed flush() to only flush mFlushQueue.

This CL also fixes a bug where end of stream flag is incorrectly set to true
when there is still data in mPendingQueue.

This CL adds a regression test.

BUG=599902

Review-Url: https://codereview.chromium.org/2078353003
Cr-Commit-Position: refs/heads/master@{#402163}
  • Loading branch information
xunjieli authored and Commit bot committed Jun 27, 2016
1 parent a7c0e4a commit 22220be
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,14 @@ private void flushLocked() {
// called before pushing data to the native stack.
return;
}
sendFlushDataLocked();
}

// Helper method to send buffers in mFlushData. Caller needs to acquire
// mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
// mFlushData queue isn't empty.
@SuppressWarnings("GuardedByChecker")
private void sendFlushDataLocked() {
assert mWriteState == State.WAITING_FOR_FLUSH;
int size = mFlushData.size();
ByteBuffer[] buffers = new ByteBuffer[size];
Expand All @@ -357,14 +365,43 @@ private void flushLocked() {
assert mFlushData.isEmpty();
assert buffers.length >= 1;
mWriteState = State.WRITING;
if (!nativeWritevData(mNativeStream, buffers, positions, limits, mEndOfStreamWritten)) {
if (!nativeWritevData(mNativeStream, buffers, positions, limits,
mEndOfStreamWritten && mPendingData.isEmpty())) {
// Still waiting on flush. This is just to have consistent
// behavior with the other error cases.
mWriteState = State.WAITING_FOR_FLUSH;
throw new IllegalArgumentException("Unable to call native writev.");
}
}

/**
* Returns a read-only copy of {@code mPendingData} for testing.
*/
@VisibleForTesting
public List<ByteBuffer> getPendingDataForTesting() {
synchronized (mNativeStreamLock) {
List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
for (ByteBuffer buffer : mPendingData) {
pendingData.add(buffer.asReadOnlyBuffer());
}
return pendingData;
}
}

/**
* Returns a read-only copy of {@code mFlushData} for testing.
*/
@VisibleForTesting
public List<ByteBuffer> getFlushDataForTesting() {
synchronized (mNativeStreamLock) {
List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
for (ByteBuffer buffer : mFlushData) {
flushData.add(buffer.asReadOnlyBuffer());
}
return flushData;
}
}

@Override
public void ping(PingCallback callback, Executor executor) {
// TODO(mef): May be last thing to be implemented on Android.
Expand Down Expand Up @@ -515,7 +552,7 @@ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPosi
mWriteState = State.WAITING_FOR_FLUSH;
// Flush if there is anything in the flush queue mFlushData.
if (!mFlushData.isEmpty()) {
flushLocked();
sendFlushDataLocked();
}
}
for (int i = 0; i < byteBuffers.length; i++) {
Expand All @@ -527,7 +564,9 @@ private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPosi
}
// Current implementation always writes the complete buffer.
buffer.position(buffer.limit());
postTaskToExecutor(new OnWriteCompletedRunnable(buffer, endOfStream));
postTaskToExecutor(new OnWriteCompletedRunnable(buffer,
// Only set endOfStream flag if this buffer is the last in byteBuffers.
endOfStream && i == byteBuffers.length - 1));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,91 @@ public void testSimplePostWithFlush() throws Exception {
"zebra", callback.mResponseInfo.getAllHeaders().get("echo-content-type").get(0));
}

@SmallTest
@Feature({"Cronet"})
@OnlyRunNativeCronet
// Tests that a delayed flush() only sends buffers that have been written
// before it is called, and it doesn't flush buffers in mPendingQueue.
public void testFlushData() throws Exception {
String url = Http2TestServer.getEchoStreamUrl();
TestBidirectionalStreamCallback callback = new TestBidirectionalStreamCallback() {
// Number of onWriteCompleted callbacks that have been invoked.
private int mNumWriteCompleted = 0;
@Override
public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
ByteBuffer buffer, boolean endOfStream) {
super.onWriteCompleted(stream, info, buffer, endOfStream);
mNumWriteCompleted++;
if (mNumWriteCompleted <= 3) {
// "6" is in pending queue.
List<ByteBuffer> pendingData =
((CronetBidirectionalStream) stream).getPendingDataForTesting();
assertEquals(1, pendingData.size());
ByteBuffer pendingBuffer = pendingData.get(0);
byte[] content = new byte[pendingBuffer.remaining()];
pendingBuffer.get(content);
assertTrue(Arrays.equals("6".getBytes(), content));

// "4" and "5" have been flushed.
assertEquals(0,
((CronetBidirectionalStream) stream).getFlushDataForTesting().size());
} else if (mNumWriteCompleted == 5) {
// Now flush "6", which is still in pending queue.
List<ByteBuffer> pendingData =
((CronetBidirectionalStream) stream).getPendingDataForTesting();
assertEquals(1, pendingData.size());
ByteBuffer pendingBuffer = pendingData.get(0);
byte[] content = new byte[pendingBuffer.remaining()];
pendingBuffer.get(content);
assertTrue(Arrays.equals("6".getBytes(), content));

stream.flush();

assertEquals(0,
((CronetBidirectionalStream) stream).getPendingDataForTesting().size());
assertEquals(0,
((CronetBidirectionalStream) stream).getFlushDataForTesting().size());
}
}
};
callback.addWriteData("1".getBytes(), false);
callback.addWriteData("2".getBytes(), false);
callback.addWriteData("3".getBytes(), true);
callback.addWriteData("4".getBytes(), false);
callback.addWriteData("5".getBytes(), true);
callback.addWriteData("6".getBytes(), false);
CronetBidirectionalStream stream = (CronetBidirectionalStream) new BidirectionalStream
.Builder(url, callback, callback.getExecutor(),
mTestFramework.mCronetEngine)
.disableAutoFlush(true)
.addHeader("foo", "bar")
.addHeader("empty", "")
.addHeader("Content-Type", "zebra")
.build();
callback.setAutoAdvance(false);
stream.start();
callback.waitForNextWriteStep(); // onStreamReady

assertEquals(0, stream.getPendingDataForTesting().size());
assertEquals(0, stream.getFlushDataForTesting().size());

// Write 1, 2, 3 and flush().
callback.startNextWrite(stream);
// Write 4, 5 and flush(). 4, 5 will be in flush queue.
callback.startNextWrite(stream);
// Write 6, but do not flush. 6 will be in pending queue.
callback.startNextWrite(stream);

callback.setAutoAdvance(true);
callback.blockForDone();
assertEquals(200, callback.mResponseInfo.getHttpStatusCode());
assertEquals("123456", callback.mResponseAsString);
assertEquals("bar", callback.mResponseInfo.getAllHeaders().get("echo-foo").get(0));
assertEquals("", callback.mResponseInfo.getAllHeaders().get("echo-empty").get(0));
assertEquals(
"zebra", callback.mResponseInfo.getAllHeaders().get("echo-content-type").get(0));
}

@SmallTest
@Feature({"Cronet"})
@OnlyRunNativeCronet
Expand Down

0 comments on commit 22220be

Please sign in to comment.