Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream decoder optimize #1590

Merged
merged 35 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
18690e9
Optimize StreamDecoder
mroccyen Mar 7, 2022
149accd
Optimize StreamDecoder
mroccyen Mar 8, 2022
29e0fd5
add license header
mroccyen Mar 8, 2022
ee9a88d
add license header
mroccyen Mar 8, 2022
48aedaa
Optimize StreamDecoder
mroccyen Mar 8, 2022
4922547
Merge branch 'master' into StreamDecoderOptimize
velo Mar 17, 2022
e5f16a8
Merge branch 'master' into StreamDecoderOptimize
velo Mar 19, 2022
71b06f3
Optimize StreamDecoder
mroccyen Mar 21, 2022
ae36e27
Merge remote-tracking branch 'origin/StreamDecoderOptimize' into Stre…
mroccyen Mar 21, 2022
8ddfa4d
Optimize StreamDecoder
mroccyen Mar 7, 2022
4531e44
Optimize StreamDecoder
mroccyen Mar 8, 2022
a8af9b1
add license header
mroccyen Mar 8, 2022
932a527
add license header
mroccyen Mar 8, 2022
3e47367
Optimize StreamDecoder
mroccyen Mar 8, 2022
3378e13
Optimize StreamDecoder
mroccyen Mar 21, 2022
131e67a
Optimize StreamDecoder
mroccyen Mar 25, 2022
d23f154
Merge remote-tracking branch 'origin/StreamDecoderOptimize' into Stre…
mroccyen Mar 25, 2022
3d039a1
Optimize StreamDecoder
mroccyen Mar 7, 2022
55c1250
Optimize StreamDecoder
mroccyen Mar 8, 2022
0570482
add license header
mroccyen Mar 8, 2022
7dd94e2
add license header
mroccyen Mar 8, 2022
5b97437
Optimize StreamDecoder
mroccyen Mar 8, 2022
07a8b2b
Optimize StreamDecoder
mroccyen Mar 21, 2022
995abdc
Optimize StreamDecoder
mroccyen Mar 25, 2022
573338a
Optimize StreamDecoder
mroccyen Mar 7, 2022
b0d550f
add license header
mroccyen Mar 8, 2022
c983eef
Optimize StreamDecoder
mroccyen Mar 8, 2022
9cb9808
Optimize StreamDecoder
mroccyen Mar 21, 2022
022fbc0
Merge remote-tracking branch 'origin/StreamDecoderOptimize' into Stre…
mroccyen Mar 25, 2022
8d8e64e
Optimize StreamDecoder
mroccyen Mar 25, 2022
bfa6d7e
add some example
mroccyen Mar 25, 2022
9d25179
Merge branch 'master' into StreamDecoderOptimize
mroccyen Mar 27, 2022
0a400d3
Optimize StreamDecoder
mroccyen Mar 27, 2022
d0f722e
add a section of README for stream decoder
mroccyen Mar 27, 2022
55e9832
Update StreamDecoder.java
velo Apr 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions core/src/main/java/feign/stream/StreamDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import feign.FeignException;
import feign.Response;
import feign.codec.Decoder;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
Expand All @@ -24,6 +25,7 @@
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static feign.Util.UTF_8;
import static feign.Util.ensureClosed;

/**
Expand All @@ -43,27 +45,33 @@
* Stream<Contributor> contributors(@Param("owner") String owner, @Param("repo") String repo);
* }</code>
* </pre>
*
* @author mroccyen
mroccyen marked this conversation as resolved.
Show resolved Hide resolved
*/
public final class StreamDecoder implements Decoder {

private final Decoder iteratorDecoder;
private final Decoder delegateDecoder;
mroccyen marked this conversation as resolved.
Show resolved Hide resolved

StreamDecoder(Decoder iteratorDecoder) {
StreamDecoder(Decoder iteratorDecoder, Decoder delegateDecoder) {
this.iteratorDecoder = iteratorDecoder;
this.delegateDecoder = delegateDecoder;
}

@Override
public Object decode(Response response, Type type)
throws IOException, FeignException {
if (!(type instanceof ParameterizedType)) {
throw new IllegalArgumentException("StreamDecoder supports only stream: unknown " + type);
if (!isStream(type)) {
if (delegateDecoder == null) {
throw new IllegalArgumentException("StreamDecoder supports types other than stream. " +
"When type is not stream, the delegate decoder needs to be setting.");
} else {
return delegateDecoder.decode(response, type);
}
}
ParameterizedType streamType = (ParameterizedType) type;
if (!Stream.class.equals(streamType.getRawType())) {
throw new IllegalArgumentException("StreamDecoder supports only stream: unknown " + type);
}
Iterator<?> iterator =
(Iterator) iteratorDecoder.decode(response, new IteratorParameterizedType(streamType));
(Iterator<?>) iteratorDecoder.decode(response, new IteratorParameterizedType(streamType));

return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, 0), false)
Expand All @@ -76,8 +84,20 @@ public Object decode(Response response, Type type)
});
}

public static boolean isStream(Type type) {
if (!(type instanceof ParameterizedType)) {
return false;
}
ParameterizedType parameterizedType = (ParameterizedType) type;
return parameterizedType.getRawType().equals(Stream.class);
}

public static StreamDecoder create(Decoder iteratorDecoder) {
mroccyen marked this conversation as resolved.
Show resolved Hide resolved
return new StreamDecoder(iteratorDecoder);
return new StreamDecoder(iteratorDecoder, null);
}

public static StreamDecoder create(Decoder iteratorDecoder, Decoder delegateDecoder) {
return new StreamDecoder(iteratorDecoder, delegateDecoder);
}

static final class IteratorParameterizedType implements ParameterizedType {
Expand All @@ -103,4 +123,17 @@ public Type getOwnerType() {
return null;
}
}

/**
* Default implementation of Decodes an http response into an Iterator
*/
public static class LineToIteratorDecoder implements Decoder {

@Override
public Object decode(Response response, Type type) throws IOException {
BufferedReader bufferedReader = new BufferedReader(response.body().asReader(UTF_8));
return bufferedReader.lines().iterator();
}

}
mroccyen marked this conversation as resolved.
Show resolved Hide resolved
}
49 changes: 39 additions & 10 deletions core/src/test/java/feign/stream/StreamDecoderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
package feign.stream;

import com.fasterxml.jackson.core.type.TypeReference;
import feign.Feign;
import feign.Request;
import feign.*;
import feign.Request.HttpMethod;
import feign.RequestLine;
import feign.Response;
import feign.Util;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -28,9 +27,6 @@
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Test;
import static feign.Util.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -41,6 +37,9 @@ interface StreamInterface {
@RequestLine("GET /")
Stream<String> get();

@RequestLine("GET /str")
String str();

@RequestLine("GET /cars")
Stream<Car> getCars();

Expand Down Expand Up @@ -79,6 +78,36 @@ public void simpleStreamTest() {
}
}

@Test
public void simpleDefaultStreamTest() {
MockWebServer server = new MockWebServer();
server.enqueue(new MockResponse().setBody("foo\nbar"));

StreamInterface api = Feign.builder()
.decoder(StreamDecoder.create(new StreamDecoder.LineToIteratorDecoder()))
.doNotCloseAfterDecode()
.target(StreamInterface.class, server.url("/").toString());

try (Stream<String> stream = api.get()) {
assertThat(stream.collect(Collectors.toList())).isEqualTo(Arrays.asList("foo", "bar"));
}
}

@Test
public void simpleDeleteDecoderTest() {
MockWebServer server = new MockWebServer();
server.enqueue(new MockResponse().setBody("foo\nbar"));

StreamInterface api = Feign.builder()
.decoder(StreamDecoder.create(new StreamDecoder.LineToIteratorDecoder(), (r, t) -> "str"))
// .decoder(StreamDecoder.create(new StreamDecoder.LineToIteratorDecoder()))
.doNotCloseAfterDecode()
.target(StreamInterface.class, server.url("/").toString());

String str = api.str();
assertThat(str).isEqualTo("str");
}

@Test
public void shouldCloseIteratorWhenStreamClosed() throws IOException {
Response response = Response.builder()
Expand All @@ -90,10 +119,10 @@ public void shouldCloseIteratorWhenStreamClosed() throws IOException {
.build();

TestCloseableIterator it = new TestCloseableIterator();
StreamDecoder decoder = new StreamDecoder((r, t) -> it);
StreamDecoder decoder = StreamDecoder.create((r, t) -> it);

try (Stream<?> stream =
(Stream) decoder.decode(response, new TypeReference<Stream<String>>() {}.getType())) {
(Stream<?>) decoder.decode(response, new TypeReference<Stream<String>>() {}.getType())) {
assertThat(stream.collect(Collectors.toList())).hasSize(1);
assertThat(it.called).isTrue();
} finally {
Expand Down