Skip to content

Commit

Permalink
Fallback to readChunkSize if InputStream.available() not implemen…
Browse files Browse the repository at this point in the history
…ted (#2949)

Motivation:

`FromInputStreamPublisher` attempts to read only 1 byte when
`InputStream.available()` is not implemented. As a result, users of
blocking streaming API with payload body as `InputStream` may write and
flush by 1 byte if their `InputStream` implementation always returns 0
available bytes (default).

Modifications:

- Consider `available()` as a best effort to avoid blocking. If it
returns 0 bytes, attempt to read up to `readChunkSize` bytes;

Result:

Improved efficiency of writing `InputStream` data for outgoing requests.
  • Loading branch information
idelpivnitskiy committed Jun 5, 2024
1 parent 963711b commit 8949741
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -171,7 +171,7 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
int available = stream.available();
if (available == 0) {
// Work around InputStreams that don't strictly honor the 0 == EOF contract.
available = buffer != null ? buffer.length : 1;
available = buffer != null ? buffer.length : readChunkSize;
}
available = fillBufferAvoidingBlocking(available);
emitSingleBuffer(subscriber);
Expand Down Expand Up @@ -212,6 +212,8 @@ private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber) {
b = buffer;
buffer = null;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
b = new byte[writeIdx];
arraycopy(buffer, 0, b, 0, writeIdx);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -200,7 +202,7 @@ void streamClosedAndErrorOnReadIOError() throws Exception {
void streamClosedAndErrorOnDeliveryError() throws Exception {
initChunkedStream(smallBuff, of(10), of(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

doAnswer(inv -> {
Subscription s = inv.getArgument(0);
Expand All @@ -221,7 +223,7 @@ void streamClosedAndErrorOnDeliveryError() throws Exception {
void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception {
initChunkedStream(smallBuff, ofAll(10), ofAll(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

AtomicReference<Subscription> subRef = new AtomicReference<>();
doAnswer(inv -> {
Expand All @@ -246,7 +248,7 @@ void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception {
void streamCanceledShouldCloseOnce() throws Exception {
initChunkedStream(smallBuff, ofAll(10), ofAll(10));

Subscriber sub = mock(Subscriber.class);
Subscriber<byte[]> sub = mock(Subscriber.class);

doAnswer(inv -> {
Subscription s = inv.getArgument(0);
Expand Down Expand Up @@ -351,21 +353,70 @@ void completeStreamIfEOFObservedDuringReadFromOverEstimatedAvailability() throws
}

@Test
void dontFailOnInputStreamWithBrokenAvailableCall() throws Throwable {
initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 5, 5, 0),
of(5, 1, 1, 10, 5, 5, 5, 5, 0));
void readsAllBytesWhenAvailableNotImplemented() throws Throwable {
// constrain publisher to 10 byte chunks with no data availability to enforce inner loops until buffer drained
initChunkedStream(bigBuff, ofAll(0), ofAll(10));

byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
new byte[]{5}, // avail == 0 -> override to 1
new byte[]{6}, // avail == 0 -> override to 1
new byte[]{7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
new byte[]{17, 18, 19, 20, 21},
new byte[]{22, 23, 24, 25, 26},
new byte[]{27, 28, 29, 30, 31},
new byte[]{32, 33, 34, 35, 36},
};
// expect single emitted item
// [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
// 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
// 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
// 30, 31, 32, 33, 34, 35, 36]

byte[][] items = chunked(bigBuff.length, bigBuff.length);
verifySuccess(items);
}

@ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}")
@ValueSource(ints = {7, 1024})
void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable {
initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 1, 0),
of(5, 7, 7, 10, 5, 5, 1, 0));
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

if (readChunkSize > bigBuff.length) {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
28, 29, 30, 31, 32, 33, 34, 35, 36},
};
verifySuccess(items);
} else {
byte[][] items = {
new byte[]{0, 1, 2, 3, 4},
// avail == 0 -> override to readChunkSize
new byte[]{5, 6, 7, 8, 9, 10, 11},
// avail == 0 -> override to readChunkSize
new byte[]{12, 13, 14, 15, 16, 17, 18},
// readChunkSize < available
new byte[]{19, 20, 21, 22, 23, 24, 25},
new byte[]{26, 27, 28, 29, 30},
new byte[]{31, 32, 33, 34, 35},
new byte[]{36},
};
verifySuccess(items);
}
}

@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable {
initChunkedStream(bigBuff, ofAll(0), ofAll(chunkSize));
int readChunkSize = 5;
pub = new FromInputStreamPublisher(inputStream, readChunkSize);

// expect 8 emitted items
// [ 0, 1, 2, 3, 4]
// [ 5, 6, 7, 8, 9]
// [10, 11, 12, 13, 14]
// [15, 16, 17, 18, 19]
// [20, 21, 22, 23, 24]
// [25, 26, 27, 28, 29]
// [30, 31, 32, 33, 34]
// [35, 36]

byte[][] items = chunked(bigBuff.length, readChunkSize);
verifySuccess(items);
}

Expand Down Expand Up @@ -400,11 +451,12 @@ void keepReadingWhenAvailabilityPermits() throws Throwable {
verifySuccess(items);
}

@Test
void repeatedReadingWhenAvailabilityRunsOut() throws Throwable {
// constrain publisher to 10 byte chunks with only 5 byte availability per chunk to enforce multiple outer loops
// simulating multiple calls to IS.available()
initChunkedStream(bigBuff, ofAll(5), ofAll(5)); // 5 byte chunks per available() call
@ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}")
@ValueSource(ints = {3, 5, 7})
void repeatedReadingWhenAvailabilityRunsOut(int chunkSize) throws Throwable {
// constrain publisher to chunkSize byte chunks with only 5 byte availability per chunk to enforce multiple
// outer loops simulating multiple calls to IS.available()
initChunkedStream(bigBuff, ofAll(5), ofAll(chunkSize)); // chunkSize byte chunks per available() call

// expect 8 emitted items
// [ 0, 1, 2, 3, 4]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;

import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;

@Test
public class PublisherFromInputStreamTckTest extends AbstractPublisherTckTest<byte[]> {

@Override
protected Publisher<byte[]> createServiceTalkPublisher(long elements) {
return Publisher.fromInputStream(new ByteArrayInputStream(new byte[(int) elements]), 1);
}

@Override
public long maxElementsFromPublisher() {
return TckUtils.maxElementsFromPublisher();
}
}

0 comments on commit 8949741

Please sign in to comment.