Skip to content

Commit

Permalink
Enable support for decompression of compressed response within RestHi…
Browse files Browse the repository at this point in the history
…ghLevelClient (elastic#53533)

Added decompression of gzip when gzip value is return as an header from Elasticsearch
  • Loading branch information
Hakky54 committed Apr 2, 2020
1 parent dbe9b48 commit 4a195b5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.elasticsearch.client;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -1872,11 +1874,18 @@ protected final ElasticsearchStatusException parseResponseException(ResponseExce
return elasticsearchException;
}

protected final <Resp> Resp parseEntity(final HttpEntity entity,
protected final <Resp> Resp parseEntity(final HttpEntity httpEntity,
final CheckedFunction<XContentParser, Resp, IOException> entityParser) throws IOException {
if (entity == null) {
if (httpEntity == null) {
throw new IllegalStateException("Response body expected but not returned");
}

final HttpEntity entity = Optional.ofNullable(httpEntity.getContentEncoding())
.map(Header::getValue)
.filter("gzip"::equalsIgnoreCase)
.map(gzipHeaderValue -> (HttpEntity) new GzipDecompressingEntity(httpEntity))
.orElse(httpEntity);

if (entity.getContentType() == null) {
throw new IllegalStateException("Elasticsearch didn't return the [Content-Type] header, unable to parse response body");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;

import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class HighLevelRestClientCompressionIT extends ESRestHighLevelClientTestCase {

private static final String GZIP_ENCODING = "gzip";
private static final String SAMPLE_DOCUMENT = "{\"name\":{\"first name\":\"Steve\",\"last name\":\"Jobs\"}}";

public void testCompressesResponseIfRequested() throws IOException {
Request doc = new Request(HttpPut.METHOD_NAME, "/company/_doc/1");
doc.setJsonEntity(SAMPLE_DOCUMENT);
client().performRequest(doc);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));

RequestOptions requestOptions = RequestOptions.DEFAULT.toBuilder()
.addHeader(HttpHeaders.ACCEPT_ENCODING, GZIP_ENCODING)
.build();

SearchRequest searchRequest = new SearchRequest("company");
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync, requestOptions);

assertThat(searchResponse.status().getStatus(), equalTo(200));
assertEquals(1L, searchResponse.getHits().getTotalHits().value);
assertEquals(SAMPLE_DOCUMENT, searchResponse.getHits().getHits()[0].getSourceAsString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -117,10 +118,12 @@
import org.hamcrest.Matchers;
import org.junit.Before;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -134,6 +137,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;

import static org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider.registeredMetricName;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
Expand Down Expand Up @@ -322,6 +326,59 @@ public void testParseEntity() throws IOException {
}
}

public void testParseCompressedEntity() throws IOException {
CheckedFunction<XContentParser, String, IOException> entityParser = parser -> {
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
assertEquals(XContentParser.Token.FIELD_NAME, parser.nextToken());
assertTrue(parser.nextToken().isValue());
String value = parser.text();
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return value;
};

HttpEntity jsonEntity = createGzipEncodedEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
assertEquals("value", restHighLevelClient.parseEntity(jsonEntity, entityParser));
HttpEntity yamlEntity = createGzipEncodedEntity("---\nfield: value\n", ContentType.create("application/yaml"));
assertEquals("value", restHighLevelClient.parseEntity(yamlEntity, entityParser));
HttpEntity smileEntity = createGzipEncodedEntity(SmileXContent.contentBuilder(), ContentType.create("application/smile"));
assertEquals("value", restHighLevelClient.parseEntity(smileEntity, entityParser));
HttpEntity cborEntity = createGzipEncodedEntity(CborXContent.contentBuilder(), ContentType.create("application/cbor"));
assertEquals("value", restHighLevelClient.parseEntity(cborEntity, entityParser));
}

private HttpEntity createGzipEncodedEntity(String content, ContentType contentType) throws IOException {
byte[] gzipEncodedContent = compressContentWithGzip(content.getBytes(StandardCharsets.UTF_8));
NByteArrayEntity httpEntity = new NByteArrayEntity(gzipEncodedContent, contentType);
httpEntity.setContentEncoding("gzip");

return httpEntity;
}

private HttpEntity createGzipEncodedEntity(XContentBuilder xContentBuilder, ContentType contentType) throws IOException {
try (XContentBuilder builder = xContentBuilder) {
builder.startObject();
builder.field("field", "value");
builder.endObject();

BytesRef bytesRef = BytesReference.bytes(xContentBuilder).toBytesRef();
byte[] gzipEncodedContent = compressContentWithGzip(bytesRef.bytes);
NByteArrayEntity httpEntity = new NByteArrayEntity(gzipEncodedContent, contentType);
httpEntity.setContentEncoding("gzip");

return httpEntity;
}
}

private static byte[] compressContentWithGzip(byte[] content) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(content.length);
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(content);
gzip.close();
bos.close();

return bos.toByteArray();
}

private static HttpEntity createBinaryEntity(XContentBuilder xContentBuilder, ContentType contentType) throws IOException {
try (XContentBuilder builder = xContentBuilder) {
builder.startObject();
Expand Down

0 comments on commit 4a195b5

Please sign in to comment.