Skip to content

Commit

Permalink
Make the Netty server implement our Server interface
Browse files Browse the repository at this point in the history
This also resolves some underlying issues with the server, including
not properly handling the buffers used in responses.
  • Loading branch information
shs96c committed Oct 15, 2019
1 parent 8249101 commit 480a5ad
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ java_library(
visibility = [
"//java/client/src/org/openqa/selenium/remote:__pkg__",
"//java/client/test/org/openqa/selenium/remote/http/okhttp:__pkg__",
"//java/server/test/org/openqa/selenium/netty/server:__pkg__",
],
deps = [
"//java/client/src/org/openqa/selenium/remote/http",
Expand Down
4 changes: 2 additions & 2 deletions java/server/src/org/openqa/selenium/grid/web/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ java_library(
srcs = glob(["*.java"]),
visibility = [
"//java/server/src/org/openqa/selenium/grid:__subpackages__",
"//java/server/src/org/openqa/selenium/netty/server:__pkg__",
"//java/server/src/org/openqa/selenium/remote/server:__subpackages__",
"//java/server/test/org/openqa/selenium/grid:__subpackages__",
"//java/server/test/org/openqa/selenium/jetty/server:__pkg__",
"//java/server/test/org/openqa/selenium:__subpackages__",
],
deps = [
"//java/client/src/org/openqa/selenium:core",
Expand Down
13 changes: 13 additions & 0 deletions java/server/src/org/openqa/selenium/netty/server/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
java_library(
name = "server",
srcs = glob(["*.java"]),
visibility = [
"//java/server/src/org/openqa/selenium/grid:__subpackages__",
"//java/server/src/org/openqa/selenium/netty/server:__pkg__",
"//java/server/src/org/openqa/selenium/remote/server:__pkg__",
"//java/server/test/org/openqa/selenium:__subpackages__",
],
exports = [
"//java/client/src/org/openqa/selenium/remote/http",
"//java/server/src/org/openqa/selenium/grid/server",
],
deps = [
"//java/client/src/org/openqa/selenium/json",
"//java/client/src/org/openqa/selenium/remote/http",
"//java/server/src/org/openqa/selenium/grid/server",
"//java/server/src/org/openqa/selenium/grid/web",
"//third_party/java/guava",
"//third_party/java/netty:netty-all",
],
Expand Down
93 changes: 62 additions & 31 deletions java/server/src/org/openqa/selenium/netty/server/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,93 @@

package org.openqa.selenium.netty.server;

import static org.openqa.selenium.remote.http.Contents.utf8String;

import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpResponse;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.openqa.selenium.grid.server.AddWebDriverSpecHeaders;
import org.openqa.selenium.grid.server.BaseServerOptions;
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.grid.server.WrapExceptions;
import org.openqa.selenium.grid.server.BaseServerOptions;
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.remote.http.HttpHandler;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Objects;

public class NettyServer {
public class NettyServer implements Server<NettyServer> {

private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final int port;
private final URL externalUrl;
private final HttpHandler handler;

private HttpHandler handler;
private Channel channel;

public NettyServer(HttpHandler handler) throws InterruptedException {
this.handler = Objects.requireNonNull(handler, "Handler to use must be set.");
public NettyServer(BaseServerOptions options, HttpHandler handler) {
Objects.requireNonNull(options, "Server options must be set.");
Objects.requireNonNull(handler, "Handler to use must be set.");

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
this.handler = handler.with(new WrapExceptions().andThen(new AddWebDriverSpecHeaders()));

bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();

port = options.getPort();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SeleniumHttpInitializer(handler));
externalUrl = options.getExternalUri().toURL();
} catch (MalformedURLException e) {
throw new UncheckedIOException("Server URI is not a valid URL: " + options.getExternalUri(), e);
}
}

Channel ch = b.bind(4444).sync().channel();
@Override
public boolean isStarted() {
return channel != null;
}

System.err.println("Open your web browser and navigate to http://127.0.0.1:4444/");
@Override
public URL getUrl() {
return externalUrl;
}

ch.closeFuture().sync();
@Override
public void stop() {
try {
channel.closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(new IOException("Shutdown interrupted", e));
} finally {
channel = null;
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public NettyServer start() {
return this;
}
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SeleniumHttpInitializer(handler));

public static void main(String[] args) throws InterruptedException {
NettyServer server = new NettyServer(req -> {
System.out.println(Contents.string(req));
HttpResponse res = new HttpResponse();
res.setContent(utf8String("Hello, World!\n"));

return res;
});
try {
channel = b.bind(port).sync().channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(new IOException("Start up interrupted", e));
}

server.start();
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,20 @@

package org.openqa.selenium.netty.server;

import static org.openqa.selenium.remote.http.Contents.bytes;

import com.google.common.io.ByteStreams;

import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpMethod;
import org.openqa.selenium.remote.http.HttpRequest;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpMethod;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
Expand All @@ -49,52 +45,32 @@ class RequestConverter extends SimpleChannelInboundHandler<HttpObject> {

private static final Logger LOG = Logger.getLogger(RequestConverter.class.getName());
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private PipedOutputStream out;
private volatile PipedOutputStream out;

@Override
protected void channelRead0(
ChannelHandlerContext ctx,
HttpObject msg) throws Exception {
LOG.fine("Incoming message: " + msg);

if (msg instanceof FullHttpRequest) {
LOG.fine("Is full http request: " + msg);
reset();
FullHttpRequest nettyRequest = (FullHttpRequest) msg;
HttpRequest req = createRequest(nettyRequest);

try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ByteBufInputStream bis = new ByteBufInputStream(nettyRequest.content())) {
ByteStreams.copy(bis, bos);
byte[] bytes = bos.toByteArray();
req.setContent(bytes(bytes));
}

ctx.fireChannelRead(req);
ctx.flush();
return;
}

if (msg instanceof io.netty.handler.codec.http.HttpRequest) {
LOG.fine("Is start of http request: " + msg);
reset();
LOG.fine("Start of http request: " + msg);

io.netty.handler.codec.http.HttpRequest nettyRequest =
(io.netty.handler.codec.http.HttpRequest) msg;
(io.netty.handler.codec.http.HttpRequest) msg;

HttpRequest req = new HttpRequest(
HttpMethod.valueOf(nettyRequest.method().name()),
nettyRequest.uri());
if (HttpUtil.is100ContinueExpected(nettyRequest)) {
ctx.write(new HttpResponse().setStatus(100));
return;
}

nettyRequest.headers().entries().stream()
.filter(entry -> entry.getKey() != null)
.forEach(entry -> req.addHeader(entry.getKey(), entry.getValue()));
HttpRequest req = createRequest(nettyRequest);

out = new PipedOutputStream();
InputStream in = new PipedInputStream(out);

req.setContent(Contents.memoize(() -> in));
ctx.fireChannelRead(req);
ctx.flush();
}

if (msg instanceof HttpContent) {
Expand All @@ -111,7 +87,7 @@ protected void channelRead0(
}

if (msg instanceof LastHttpContent) {
LOG.info("Closing input pipe.");
LOG.fine("Closing input pipe.");
EXECUTOR.submit(() -> {
try {
out.close();
Expand All @@ -122,20 +98,6 @@ protected void channelRead0(
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
reset();
super.channelReadComplete(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
reset();
}

private HttpRequest createRequest(io.netty.handler.codec.http.HttpRequest nettyRequest) {
HttpRequest req = new HttpRequest(
HttpMethod.valueOf(nettyRequest.method().name()),
Expand All @@ -147,9 +109,4 @@ private HttpRequest createRequest(io.netty.handler.codec.http.HttpRequest nettyR

return req;
}

private void reset() throws Exception {
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import org.openqa.selenium.remote.http.HttpResponse;

import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -55,14 +57,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
byte[] ary = new byte[CHUNK_SIZE];
InputStream is = seResponse.getContent().get();
int byteCount = is.read(ary);
// If there are no bytes left to read, then -1 is returned by read, and this is bad.
byteCount = byteCount == -1 ? 0 : byteCount;

DefaultHttpResponse first;
if (byteCount < CHUNK_SIZE) {
is.close();
first = new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(seResponse.getStatus()),
Unpooled.wrappedBuffer(ary));
Unpooled.wrappedBuffer(ary, 0, byteCount));
first.headers().addInt(CONTENT_LENGTH, byteCount);
copyHeaders(seResponse, first);
ctx.write(first);
Expand Down Expand Up @@ -92,6 +96,9 @@ private void copyHeaders(HttpResponse seResponse, DefaultHttpResponse first) {
continue;
}
for (String value : seResponse.getHeaders(name)) {
if (value == null) {
continue;
}
first.headers().add(name, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.openqa.selenium.netty.server;

import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.openqa.selenium.grid.web.ErrorHandler;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
Expand All @@ -30,7 +32,8 @@
class SeleniumHandler extends SimpleChannelInboundHandler<HttpRequest> {

private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
private HttpHandler seleniumHandler;
private static final Json JSON = new Json();
private final HttpHandler seleniumHandler;

public SeleniumHandler(HttpHandler seleniumHandler) {
super(HttpRequest.class);
Expand All @@ -40,11 +43,13 @@ public SeleniumHandler(HttpHandler seleniumHandler) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
EXECUTOR.submit(() -> {
HttpResponse res;
try {
ctx.writeAndFlush(seleniumHandler.execute(msg));
} catch (Exception e) {
ctx.fireExceptionCaught(e);
res = seleniumHandler.execute(msg);
} catch (Throwable e) {
res = new ErrorHandler(JSON, e).execute(msg);
}
ctx.writeAndFlush(res);
});
}
}
Loading

0 comments on commit 480a5ad

Please sign in to comment.