Skip to content

Commit

Permalink
Add unix domain socket support to Reactor http client
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed Jul 23, 2020
1 parent 53c5f21 commit cbb461d
Show file tree
Hide file tree
Showing 11 changed files with 1,713 additions and 1,607 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ java_library(
name = "reactor",
srcs = glob(["*.java"]),
visibility = [
"//java:__subpackages__",
"//java/client/src/org/openqa/selenium/remote:__pkg__",
"//java/client/test/org/openqa/selenium/remote/http:__pkg__",
"//java/client/test/org/openqa/selenium/remote/http/reactor:__pkg__",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.openqa.selenium.remote.http.reactor;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.auto.service.AutoService;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.unix.DomainSocketAddress;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.AddSeleniumUserAgent;
import org.openqa.selenium.remote.http.ClientConfig;
Expand All @@ -34,9 +35,6 @@
import org.openqa.selenium.remote.http.Message;
import org.openqa.selenium.remote.http.TextMessage;
import org.openqa.selenium.remote.http.WebSocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.websocket.WebsocketOutbound;
Expand All @@ -47,109 +45,149 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.nio.charset.StandardCharsets.UTF_8;

public class ReactorClient implements HttpClient {

private static final Logger log = Logger.getLogger(ReactorClient.class.getName());

private static final Map<HttpMethod, io.netty.handler.codec.http.HttpMethod> methodMap =
ImmutableMap.of(HttpMethod.DELETE, io.netty.handler.codec.http.HttpMethod.DELETE,
HttpMethod.GET, io.netty.handler.codec.http.HttpMethod.GET,
HttpMethod.POST, io.netty.handler.codec.http.HttpMethod.POST);
private static final Map<String, Integer> SCHEME_TO_PORT = ImmutableMap.of(
"http", 80,
"https", 443,
"ws", 80,
"wss", 443);

private static final int MAX_CHUNK_SIZE = 1024 * 512 ; // 500k
private static final Map<HttpMethod, io.netty.handler.codec.http.HttpMethod> METHOD_MAP =
ImmutableMap.of(HttpMethod.DELETE, io.netty.handler.codec.http.HttpMethod.DELETE,
HttpMethod.GET, io.netty.handler.codec.http.HttpMethod.GET,
HttpMethod.POST, io.netty.handler.codec.http.HttpMethod.POST);

private static final int MAX_CHUNK_SIZE = 1024 * 512; // 500k

private final ClientConfig config;
private final reactor.netty.http.client.HttpClient httpClient;

private ReactorClient(ClientConfig config) {
this.config = config;
httpClient = reactor.netty.http.client.HttpClient.create()
.baseUrl(config.baseUrl().toString())
.keepAlive(true);
this.config = Require.nonNull("Client config", config);
this.httpClient = createClient();
}

private reactor.netty.http.client.HttpClient createClient() {
reactor.netty.http.client.HttpClient client = reactor.netty.http.client.HttpClient.create()
.followRedirect(true)
.keepAlive(true);

switch (config.baseUri().getScheme()) {
case "http":
case "https":
int port = config.baseUri().getPort() == -1 ?
SCHEME_TO_PORT.get(config.baseUri().getScheme()) :
config.baseUri().getPort();
SocketAddress inetAddr = new InetSocketAddress(config.baseUri().getHost(), port);
client = client.remoteAddress(() -> inetAddr)
.tcpConfiguration(
tcpClient -> tcpClient.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(config.connectionTimeout().toMillis())));
break;

case "unix":
Path socket = Paths.get(config.baseUri().getPath());
SocketAddress domainAddr = new DomainSocketAddress(socket.toFile());
client = client.remoteAddress(() -> domainAddr);
break;

default:
throw new IllegalArgumentException("Base URI must be unix, http, or https: " + config.baseUri());
}

return client;
}

@Override
public HttpResponse execute(HttpRequest request) {
StringBuilder uri = new StringBuilder(request.getUri());
List<String> queryPairs = new ArrayList<>();
request.getQueryParameterNames().forEach(
name -> request.getQueryParameters(name).forEach(
value -> {
try {
queryPairs.add(
URLEncoder.encode(name, UTF_8.toString()) + "=" + URLEncoder.encode(value, UTF_8.toString()));
} catch (UnsupportedEncodingException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(e);
}
}));
name -> request.getQueryParameters(name).forEach(
value -> {
try {
queryPairs.add(
URLEncoder.encode(name, UTF_8.toString()) + "=" + URLEncoder.encode(value, UTF_8.toString()));
} catch (UnsupportedEncodingException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(e);
}
}));
if (!queryPairs.isEmpty()) {
uri.append("?");
Joiner.on('&').appendTo(uri, queryPairs);
}

Tuple2<InputStream, HttpResponse> result = httpClient
.headers(h -> {
request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value)));
if (request.getHeader("User-Agent") == null) {
h.set("User-Agent", AddSeleniumUserAgent.USER_AGENT);
}
}
)
.request(methodMap.get(request.getMethod()))
.uri(uri.toString())
.send((r, out) -> out.send(fromInputStream(request.getContent().get())))
.responseSingle((res, buf) -> {
HttpResponse toReturn = new HttpResponse();
toReturn.setStatus(res.status().code());
res.responseHeaders().entries().forEach(
entry -> toReturn.addHeader(entry.getKey(), entry.getValue()));
return buf.asInputStream()
.switchIfEmpty(Mono.just(new ByteArrayInputStream("".getBytes())))
.zipWith(Mono.just(toReturn));
}).block();
.headers(h -> {
request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value)));
if (request.getHeader("User-Agent") == null) {
h.set("User-Agent", AddSeleniumUserAgent.USER_AGENT);
}
})
.request(METHOD_MAP.get(request.getMethod()))
.uri(uri.toString())
.send((r, out) -> out.send(fromInputStream(request.getContent().get())))
.responseSingle((res, buf) -> {
HttpResponse toReturn = new HttpResponse();
toReturn.setStatus(res.status().code());
res.responseHeaders().entries().forEach(
entry -> toReturn.addHeader(entry.getKey(), entry.getValue()));
return buf.asInputStream()
.switchIfEmpty(Mono.just(new ByteArrayInputStream("".getBytes())))
.zipWith(Mono.just(toReturn));
}).block();
result.getT2().setContent(result::getT1);
return result.getT2();
}

private Flux<ByteBuf> fromInputStream(InputStream is) {
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
return Flux.generate(
() -> is,
(in, sync) -> {
ByteBuf buf = allocator.buffer();
try {
if (buf.writeBytes(in, MAX_CHUNK_SIZE) < 0) {
buf.release();
sync.complete();
} else {
sync.next(buf);
}
} catch (IOException ex) {
() -> is,
(in, sync) -> {
ByteBuf buf = allocator.buffer();
try {
if (buf.writeBytes(in, MAX_CHUNK_SIZE) < 0) {
buf.release();
sync.error(ex);
sync.complete();
} else {
sync.next(buf);
}
return in;
},
in -> {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
log.log(Level.INFO, e.getMessage(), e);
} catch (IOException ex) {
buf.release();
sync.error(ex);
}
return in;
},
in -> {
try {
if (in != null) {
in.close();
}
});
} catch (IOException e) {
log.log(Level.INFO, e.getMessage(), e);
}
});
}

@Override
Expand All @@ -161,9 +199,10 @@ public WebSocket openSocket(HttpRequest request, WebSocket.Listener listener) {
URI origUri = new URI(request.getUri());
URI wsUri = new URI("ws", null, origUri.getHost(), origUri.getPort(), origUri.getPath(), null, null);

return new ReactorWebSocket(httpClient
return new ReactorWebSocket(
httpClient
.headers(h -> request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value))))
name -> request.getHeaders(name).forEach(value -> h.set(name, value))))
.websocket().uri(wsUri.toString()), listener);
} catch (URISyntaxException e) {
log.log(Level.INFO, e.getMessage(), e);
Expand Down

This file was deleted.

Loading

0 comments on commit cbb461d

Please sign in to comment.