From 0fad36d47bf89672702f1bea21dc905a135c3d79 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 20 Feb 2024 11:07:44 +0100 Subject: [PATCH 1/7] Unit tests for the gRPC-Web protocol These tests have been built by inspection of a TCP traffic capture between a browser and the Envoy proxy, configured as recommended by the gRPC-Web project. More tests will be added in follow-up commits Signed-off-by: Thomas Segismont --- .../io/vertx/grpc/common/GrpcTestBase.java | 2 +- .../grpc/server/web/BinaryServerTest.java | 44 +++ .../io/vertx/grpc/server/web/Interceptor.java | 50 +++ .../vertx/grpc/server/web/ServerTestBase.java | 359 ++++++++++++++++++ .../grpc/server/web/TestServiceImpl.java | 62 +++ .../vertx/grpc/server/web/TextServerTest.java | 68 ++++ .../src/test/proto/grpc-web-testing.proto | 31 ++ 7 files changed, 615 insertions(+), 1 deletion(-) create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/Interceptor.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TestServiceImpl.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java create mode 100644 vertx-grpc-server/src/test/proto/grpc-web-testing.proto diff --git a/vertx-grpc-common/src/test/java/io/vertx/grpc/common/GrpcTestBase.java b/vertx-grpc-common/src/test/java/io/vertx/grpc/common/GrpcTestBase.java index 82760073..378329f3 100644 --- a/vertx-grpc-common/src/test/java/io/vertx/grpc/common/GrpcTestBase.java +++ b/vertx-grpc-common/src/test/java/io/vertx/grpc/common/GrpcTestBase.java @@ -35,7 +35,7 @@ public abstract class GrpcTestBase { protected int port; @Before - public void setUp() { + public void setUp(TestContext should) { port = 8080; vertx = Vertx.vertx(); } diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java new file mode 100644 index 00000000..dd26c59d --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web; + +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; + +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; + +/** + * Tests for gRPC-Web server using the binary protocol. + */ +public class BinaryServerTest extends ServerTestBase { + + private static final CharSequence GRPC_WEB_PROTO = HttpHeaders.createOptimized("application/grpc-web+proto"); + + @Override + protected MultiMap requestHeaders() { + return MultiMap.caseInsensitiveMultiMap() + .add(CONTENT_TYPE, GRPC_WEB_PROTO) + .add(USER_AGENT, GRPC_WEB_JAVASCRIPT_0_1) + .add(GRPC_WEB, TRUE); + } + + @Override + protected CharSequence responseContentType() { + return GRPC_WEB_PROTO; + } + + @Override + protected Buffer decodeBody(Buffer buffer) { + return buffer; + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/Interceptor.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/Interceptor.java new file mode 100644 index 00000000..4acc4127 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/Interceptor.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web; + +import io.grpc.*; +import io.grpc.Metadata.Key; + +import java.util.Set; +import java.util.stream.Stream; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; +import static io.grpc.Metadata.BINARY_BYTE_MARSHALLER; +import static java.util.stream.Collectors.toSet; + +class Interceptor implements ServerInterceptor { + + private static final Key HEADER_TEXT_KEY = Key.of("x-header-text-key", ASCII_STRING_MARSHALLER); + private static final Key HEADER_BIN_KEY = Key.of("x-header-bin-key-bin", BINARY_BYTE_MARSHALLER); + private static final Set> HEADERS_KEY_SET = Stream.of(HEADER_TEXT_KEY, HEADER_BIN_KEY).collect(toSet()); + private static final Key TRAILER_TEXT_KEY = Key.of("x-trailer-text-key", ASCII_STRING_MARSHALLER); + private static final Key TRAILER_BIN_KEY = Key.of("x-trailer-bin-key-bin", BINARY_BYTE_MARSHALLER); + private static final Set> TRAILERS_KEY_SET = Stream.of(TRAILER_TEXT_KEY, TRAILER_BIN_KEY).collect(toSet()); + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata metadata, ServerCallHandler next) { + return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { + + @Override + public void sendHeaders(Metadata headers) { + headers.merge(metadata, HEADERS_KEY_SET); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + trailers.merge(metadata, TRAILERS_KEY_SET); + super.close(status, trailers); + } + }, metadata); + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java new file mode 100644 index 00000000..b0372888 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java @@ -0,0 +1,359 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.ext.unit.TestContext; +import io.vertx.grpc.common.GrpcMessage; +import io.vertx.grpc.common.GrpcTestBase; +import io.vertx.grpc.common.impl.GrpcMessageImpl; +import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServiceBridge; +import io.vertx.grpcweb.GrpcWebTesting.*; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; + +import static io.vertx.core.http.HttpHeaders.CONTENT_LENGTH; +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import static java.util.stream.Collectors.joining; +import static org.junit.Assert.*; + +/** + * Base class for gRPC-Web tests. + */ +public abstract class ServerTestBase extends GrpcTestBase { + + private static final String TEST_SERVICE = "/io.vertx.grpcweb.TestService"; + + private static final CharSequence HEADER_TEXT_KEY = HttpHeaders.createOptimized("x-header-text-key"); + private static final CharSequence HEADER_TEXT_VALUE = HttpHeaders.createOptimized("header_text_value"); + private static final CharSequence HEADER_BIN_KEY = HttpHeaders.createOptimized("x-header-bin-key-bin"); + private static final CharSequence HEADER_BIN_VALUE = HttpHeaders.createOptimized(String.valueOf(0xabcdef)); + private static final CharSequence TRAILER_TEXT_KEY = HttpHeaders.createOptimized("x-trailer-text-key"); + private static final CharSequence TRAILER_TEXT_VALUE = HttpHeaders.createOptimized("trailer_text_value"); + private static final CharSequence TRAILER_BIN_KEY = HttpHeaders.createOptimized("x-trailer-bin-key-bin"); + private static final CharSequence TRAILER_BIN_VALUE = HttpHeaders.createOptimized(String.valueOf(0xfedcba)); + private static final CharSequence TRAILER_ERROR_KEY = HttpHeaders.createOptimized("x-error-trailer"); + + private static final MultiMap METADATA = MultiMap.caseInsensitiveMultiMap() + .add(HEADER_TEXT_KEY, HEADER_TEXT_VALUE) + .add(HEADER_BIN_KEY, HEADER_BIN_VALUE) + .add(TRAILER_TEXT_KEY, TRAILER_TEXT_VALUE) + .add(TRAILER_BIN_KEY, TRAILER_BIN_VALUE); + + protected static final CharSequence USER_AGENT = HttpHeaders.createOptimized("X-User-Agent"); + protected static final CharSequence GRPC_WEB_JAVASCRIPT_0_1 = HttpHeaders.createOptimized("grpc-web-javascript/0.1"); + protected static final CharSequence GRPC_WEB = HttpHeaders.createOptimized("X-Grpc-Web"); + protected static final CharSequence TRUE = HttpHeaders.createOptimized("1"); + + private static final String GRPC_STATUS = "grpc-status"; + private static final String STATUS_OK = GRPC_STATUS + ":" + 0 + "\r\n"; + private static final String TRAILERS_AND_STATUS = TRAILER_TEXT_KEY + ":" + TRAILER_TEXT_VALUE + "\r\n" + + TRAILER_BIN_KEY + ":" + TRAILER_BIN_VALUE + "\r\n" + + STATUS_OK; + + private static final Empty EMPTY_DEFAULT_INSTANCE = Empty.getDefaultInstance(); + + private static final int PREFIX_SIZE = 5; + + private HttpClient httpClient; + private HttpServer httpServer; + + @Override + public void setUp(TestContext should) { + super.setUp(should); + HttpClientOptions clientOptions = new HttpClientOptions().setDefaultPort(port); + httpClient = vertx.createHttpClient(clientOptions); + HttpServerOptions serverOptions = new HttpServerOptions().setPort(port); + GrpcServer grpcServer = GrpcServer.server(vertx); + ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(new TestServiceImpl(), new Interceptor()); + GrpcServiceBridge.bridge(serviceDefinition).bind(grpcServer); + httpServer = vertx.createHttpServer(serverOptions).requestHandler(grpcServer); + httpServer.listen().onComplete(should.asyncAssertSuccess()); + } + + @Override + public void tearDown(TestContext should) { + httpServer.close().onComplete(should.asyncAssertSuccess()); + httpClient.close().onComplete(should.asyncAssertSuccess()); + super.tearDown(should); + } + + protected abstract MultiMap requestHeaders(); + + protected abstract CharSequence responseContentType(); + + protected abstract Buffer decodeBody(Buffer buffer); + + @Test + public void testEmpty(TestContext should) { + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/EmptyCall").compose(req -> { + req.headers().addAll(requestHeaders()); + return req.send(encode(EMPTY_DEFAULT_INSTANCE)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + Buffer prefix = body.getBuffer(pos, PREFIX_SIZE); + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + assertEquals(0, prefix.getInt(1)); + pos += PREFIX_SIZE; + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + int len = trailer.getInt(1); + assertEquals(STATUS_OK, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testEmptyWithMetadata(TestContext should) { + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/EmptyCall").compose(req -> { + req.headers() + .addAll(METADATA) + .addAll(requestHeaders()); + return req.send(encode(EMPTY_DEFAULT_INSTANCE)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + Buffer prefix = body.getBuffer(pos, PREFIX_SIZE); + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + assertEquals(0, prefix.getInt(1)); + pos += PREFIX_SIZE; + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + int len = trailer.getInt(1); + assertEquals(TRAILERS_AND_STATUS, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testSmallPayload(TestContext should) { + String payload = "foobar"; + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/UnaryCall").compose(req -> { + req.headers().addAll(requestHeaders()); + EchoRequest echoRequest = EchoRequest.newBuilder().setPayload(payload).build(); + return req.send(encode(echoRequest)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + Buffer prefix = body.getBuffer(pos, PREFIX_SIZE); + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + int len = prefix.getInt(1); + pos += PREFIX_SIZE; + + EchoResponse echoResponse = parseEchoResponse(body.getBuffer(pos, pos + len)); + assertEquals(payload, echoResponse.getPayload()); + pos += len; + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + len = trailer.getInt(1); + assertEquals(STATUS_OK, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testLargePayloadWithMetadata(TestContext should) { + String payload = IntStream.range(0, 16 * 1024).mapToObj(i -> "foobar").collect(joining()); + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/UnaryCall").compose(req -> { + req.headers() + .addAll(METADATA) + .addAll(requestHeaders()); + EchoRequest echoRequest = EchoRequest.newBuilder().setPayload(payload).build(); + return req.send(encode(echoRequest)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + Buffer prefix = body.getBuffer(pos, PREFIX_SIZE); + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + int len = prefix.getInt(1); + pos += PREFIX_SIZE; + + EchoResponse echoResponse = parseEchoResponse(body.getBuffer(pos, pos + len)); + assertEquals(payload, echoResponse.getPayload()); + pos += len; + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + len = trailer.getInt(1); + assertEquals(TRAILERS_AND_STATUS, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testServerSideStreaming(TestContext should) { + List requestedSizes = Arrays.asList(157, 52, 16 * 1024, 1); + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/StreamingCall").compose(req -> { + req.headers().addAll(requestHeaders()); + StreamingRequest streamingRequest = StreamingRequest.newBuilder().addAllResponseSize(requestedSizes).build(); + return req.send(encode(streamingRequest)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + int len; + for (int requestedSize : requestedSizes) { + Buffer prefix = body.getBuffer(pos, pos + PREFIX_SIZE); + pos += PREFIX_SIZE; + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + len = prefix.getInt(1); + StreamingResponse streamingResponse = parseStreamingResponse(body.getBuffer(pos, pos + len)); + char[] expected = new char[requestedSize]; + Arrays.fill(expected, 'a'); + assertArrayEquals(expected, streamingResponse.getPayload().toCharArray()); + pos += len; + } + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + len = trailer.getInt(1); + assertEquals(STATUS_OK, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testServerSideStreamingWithMetadata(TestContext should) { + List requestedSizes = Arrays.asList(157, 52, 16 * 1024, 1); + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/StreamingCall").compose(req -> { + req.headers() + .addAll(METADATA) + .addAll(requestHeaders()); + StreamingRequest streamingRequest = StreamingRequest.newBuilder().addAllResponseSize(requestedSizes).build(); + return req.send(encode(streamingRequest)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + + Buffer body = decodeBody(response.body().result()); + int pos = 0; + + int len; + for (int requestedSize : requestedSizes) { + Buffer prefix = body.getBuffer(pos, pos + PREFIX_SIZE); + pos += PREFIX_SIZE; + assertEquals(0x00, prefix.getUnsignedByte(0)); // Uncompressed message + len = prefix.getInt(1); + StreamingResponse streamingResponse = parseStreamingResponse(body.getBuffer(pos, pos + len)); + char[] expected = new char[requestedSize]; + Arrays.fill(expected, 'a'); + assertArrayEquals(expected, streamingResponse.getPayload().toCharArray()); + pos += len; + } + + Buffer trailer = body.getBuffer(pos, body.length()); + assertEquals(0x80, trailer.getUnsignedByte(0)); // Uncompressed trailer + len = trailer.getInt(1); + assertEquals(TRAILERS_AND_STATUS, trailer.getBuffer(PREFIX_SIZE, PREFIX_SIZE + len).toString()); + + }); + })); + } + + @Test + public void testTrailersOnly(TestContext should) { + httpClient.request(HttpMethod.POST, TEST_SERVICE + "/UnaryCall").compose(req -> { + req.headers() + .addAll(METADATA) + .addAll(requestHeaders()); + EchoRequest echoRequest = EchoRequest.newBuilder().setPayload("boom").build(); + return req.send(encode(echoRequest)).compose(response -> response.body().map(response)); + }).onComplete(should.asyncAssertSuccess(response -> { + should.verify(v -> { + assertEquals(200, response.statusCode()); + MultiMap headers = response.headers(); + assertTrue(headers.contains(CONTENT_TYPE, responseContentType(), true)); + assertTrue(headers.contains(CONTENT_LENGTH, "0", true)); + assertTrue(headers.contains(TRAILER_TEXT_KEY, TRAILER_TEXT_VALUE, false)); + assertTrue(headers.contains(TRAILER_BIN_KEY, TRAILER_BIN_VALUE, false)); + assertTrue(headers.contains(TRAILER_ERROR_KEY, "boom", false)); + assertTrue(headers.contains(GRPC_STATUS, "13", false)); + }); + })); + } + + protected Buffer encode(Message message) { + return GrpcMessageImpl.encode(GrpcMessage.message("identity", Buffer.buffer(message.toByteArray()))); + } + + private static EchoResponse parseEchoResponse(Buffer buffer) { + try { + return EchoResponse.parseFrom(buffer.getBytes()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + private static StreamingResponse parseStreamingResponse(Buffer buffer) { + try { + return StreamingResponse.parseFrom(buffer.getBytes()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TestServiceImpl.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TestServiceImpl.java new file mode 100644 index 00000000..19137ec1 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TestServiceImpl.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web; + +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.stub.StreamObserver; +import io.vertx.grpcweb.GrpcWebTesting.*; +import io.vertx.grpcweb.TestServiceGrpc; + +import java.util.Arrays; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; + +class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { + + private static final Key TRAILER_ERROR_KEY = Key.of("x-error-trailer", ASCII_STRING_MARSHALLER); + + @Override + public void emptyCall(Empty request, StreamObserver responseObserver) { + responseObserver.onNext(Empty.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public void unaryCall(EchoRequest request, StreamObserver responseObserver) { + String payload = request.getPayload(); + if ("boom".equals(payload)) { + Metadata metadata = new Metadata(); + metadata.put(TRAILER_ERROR_KEY, "boom"); + responseObserver.onError(new StatusException(Status.INTERNAL, metadata)); + } else { + EchoResponse response = EchoResponse.newBuilder() + .setPayload(payload) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + } + + @Override + public void streamingCall(StreamingRequest request, StreamObserver responseObserver) { + for (int requestedSize : request.getResponseSizeList()) { + char[] value = new char[requestedSize]; + Arrays.fill(value, 'a'); + StreamingResponse response = StreamingResponse.newBuilder().setPayload(new String(value)).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java new file mode 100644 index 00000000..26957f50 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web; + +import com.google.protobuf.Message; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; + +import java.util.Base64; + +import static io.vertx.core.http.HttpHeaders.ACCEPT; +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import static org.junit.Assert.assertEquals; + +/** + * Tests for gRPC-Web server using the text (base64) protocol. + */ +public class TextServerTest extends ServerTestBase { + + private static final CharSequence GRPC_WEB_TEXT = HttpHeaders.createOptimized("application/grpc-web-text"); + private static final CharSequence GRPC_WEB_TEXT_PROTO = HttpHeaders.createOptimized(GRPC_WEB_TEXT + "+proto"); + private static final Base64.Encoder ENCODER = Base64.getEncoder(); + private static final Base64.Decoder DECODER = Base64.getDecoder(); + + @Override + protected MultiMap requestHeaders() { + return MultiMap.caseInsensitiveMultiMap() + .add(ACCEPT, GRPC_WEB_TEXT) + .add(CONTENT_TYPE, GRPC_WEB_TEXT) + .add(USER_AGENT, GRPC_WEB_JAVASCRIPT_0_1) + .add(GRPC_WEB, TRUE); + } + + @Override + protected CharSequence responseContentType() { + return GRPC_WEB_TEXT_PROTO; + } + + @Override + protected Buffer encode(Message message) { + Buffer buffer = super.encode(message); + // The whole message must be encoded at once when sending + return Buffer.buffer(ENCODER.encode(buffer.getBytes())); + } + + @Override + protected Buffer decodeBody(Buffer buffer) { + // The server sends base64 encoded chunks of arbitrary size + // All we know is that a 4-bytes block is always a valid base64 payload + assertEquals(0, buffer.length() % 4); + Buffer res = Buffer.buffer(); + for (int i = 0; i < buffer.length(); i += 4) { + byte[] block = buffer.getBytes(i, i + 4); + res.appendBytes(DECODER.decode(block)); + } + return res; + } +} diff --git a/vertx-grpc-server/src/test/proto/grpc-web-testing.proto b/vertx-grpc-server/src/test/proto/grpc-web-testing.proto new file mode 100644 index 00000000..fa01dc17 --- /dev/null +++ b/vertx-grpc-server/src/test/proto/grpc-web-testing.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package io.vertx.grpcweb; + +service TestService { + rpc EmptyCall(Empty) returns (Empty); + rpc UnaryCall(EchoRequest) returns (EchoResponse); + rpc StreamingCall(StreamingRequest) returns (stream StreamingResponse); +} + +service UnimplementedService { + rpc UnimplementedCall(Empty) returns (Empty); // Do not implement +} + +message Empty {} + +message EchoRequest { + string payload = 1; +} + +message EchoResponse { + string payload = 1; +} + +message StreamingRequest { + repeated int32 response_size = 1; +} + +message StreamingResponse { + string payload = 1; +} From 87ebe5054afd47331c97c92f44abe06ddf55e027 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Wed, 21 Feb 2024 16:21:28 +0100 Subject: [PATCH 2/7] Implement gRPC Web protocol Signed-off-by: Thomas Segismont --- .../io/vertx/grpc/common/GrpcMediaType.java | 74 ++++++++++++++++ .../grpc/common/impl/GrpcMessageImpl.java | 11 ++- .../server/GrpcServerOptionsConverter.java | 39 ++++++++ .../java/io/vertx/grpc/server/GrpcServer.java | 22 +++-- .../vertx/grpc/server/GrpcServerOptions.java | 88 +++++++++++++++++++ .../grpc/server/impl/GrpcServerImpl.java | 41 ++++++++- .../server/impl/GrpcServerRequestImpl.java | 69 +++++++++++++-- .../server/impl/GrpcServerResponseImpl.java | 75 ++++++++++++---- .../grpc/server/web/BinaryServerTest.java | 5 +- .../vertx/grpc/server/web/ServerTestBase.java | 25 ++++-- .../vertx/grpc/server/web/TextServerTest.java | 6 +- 11 files changed, 404 insertions(+), 51 deletions(-) create mode 100644 vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java create mode 100644 vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java create mode 100644 vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java new file mode 100644 index 00000000..a3f1bee3 --- /dev/null +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.common; + +import io.netty.util.AsciiString; +import io.vertx.codegen.annotations.Unstable; +import io.vertx.core.http.HttpHeaders; + +/** + * The gRPC media types. + */ +@Unstable +public class GrpcMediaType { + + /** + * gRPC. + */ + public static final CharSequence GRPC = HttpHeaders.createOptimized("application/grpc"); + /** + * gRPC with Protobuf message format. + */ + public static final CharSequence GRPC_PROTO = HttpHeaders.createOptimized("application/grpc+proto"); + + /** + * gRPC Web binary. + */ + public static final CharSequence GRPC_WEB = HttpHeaders.createOptimized("application/grpc-web"); + /** + * gRPC Web binary with Protobuf message format. + */ + public static final CharSequence GRPC_WEB_PROTO = HttpHeaders.createOptimized("application/grpc-web+proto"); + + /** + * Whether the provided {@code mediaType} represents gRPC-Web + * + * @param mediaType the value to test + * @return {@code true} if the value represents gRPC-Web, {@code false} otherwise + */ + public static boolean isGrpcWeb(CharSequence mediaType) { + return AsciiString.regionMatches(GRPC_WEB, true, 0, mediaType, 0, GRPC_WEB.length()); + } + + /** + * gRPC Web text (base64). + */ + public static final CharSequence GRPC_WEB_TEXT = HttpHeaders.createOptimized("application/grpc-web-text"); + /** + * gRPC Web text (base64) with Protobuf message format. + */ + public static final CharSequence GRPC_WEB_TEXT_PROTO = HttpHeaders.createOptimized("application/grpc-web-text+proto"); + + /** + * Whether the provided {@code mediaType} represents gRPC-Web + * + * @param mediaType the value to test + * @return {@code true} if the value represents gRPC-Web, {@code false} otherwise + */ + public static boolean isGrpcWebText(CharSequence mediaType) { + return AsciiString.regionMatches(GRPC_WEB_TEXT, true, 0, mediaType, 0, GRPC_WEB_TEXT.length()); + } + + private GrpcMediaType() { + // Constants + } +} diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java index 80ffeb55..530060d4 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java @@ -21,10 +21,18 @@ public class GrpcMessageImpl implements GrpcMessage { private final String encoding; private final Buffer payload; + private final boolean trailer; public GrpcMessageImpl(String encoding, Buffer payload) { this.encoding = encoding; this.payload = payload; + trailer = false; + } + + public GrpcMessageImpl(String encoding, Buffer payload, boolean trailer) { + this.encoding = encoding; + this.payload = payload; + this.trailer = trailer; } @Override @@ -41,8 +49,9 @@ public static Buffer encode(GrpcMessage message) { ByteBuf bbuf = ((BufferInternal)message.payload()).getByteBuf(); int len = bbuf.readableBytes(); boolean compressed = !message.encoding().equals("identity"); + boolean trailer = message instanceof GrpcMessageImpl && ((GrpcMessageImpl) message).trailer; ByteBuf prefix = Unpooled.buffer(5, 5); - prefix.writeByte(compressed ? 1 : 0); // Compression flag + prefix.writeByte((trailer ? 0x80 : 0x00) | (compressed ? 0x01 : 0x00)); prefix.writeInt(len); // Length CompositeByteBuf composite = Unpooled.compositeBuffer(); composite.addComponent(true, prefix); diff --git a/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java b/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java new file mode 100644 index 00000000..8db0c974 --- /dev/null +++ b/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java @@ -0,0 +1,39 @@ +package io.vertx.grpc.server; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.impl.JsonUtil; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Base64; + +/** + * Converter and mapper for {@link io.vertx.grpc.server.GrpcServerOptions}. + * NOTE: This class has been automatically generated from the {@link io.vertx.grpc.server.GrpcServerOptions} original class using Vert.x codegen. + */ +public class GrpcServerOptionsConverter { + + + private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER; + private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER; + + static void fromJson(Iterable> json, GrpcServerOptions obj) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "grpcWebEnabled": + if (member.getValue() instanceof Boolean) { + obj.setGrpcWebEnabled((Boolean)member.getValue()); + } + break; + } + } + } + + static void toJson(GrpcServerOptions obj, JsonObject json) { + toJson(obj, json.getMap()); + } + + static void toJson(GrpcServerOptions obj, java.util.Map json) { + json.put("grpcWebEnabled", obj.isGrpcWebEnabled()); + } +} diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java index 1cdbfe4c..2e456898 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -26,10 +26,10 @@ *

The server can be used as a {@link io.vertx.core.http.HttpServer} handler or mounted as a Vert.x Web handler. * *

Unlike traditional gRPC servers, this server does not rely on a generated RPC interface to interact with the service. - * + *

* Instead, you can interact with the service with a request/response interfaces and gRPC messages, very much like * a traditional client. - * + *

* The server exposes 2 levels of handlers * *

    @@ -42,12 +42,22 @@ public interface GrpcServer extends Handler { /** - * Create a blank gRPC server + * Create a blank gRPC server with default options. * * @return the created server */ static GrpcServer server(Vertx vertx) { - return new GrpcServerImpl(vertx); + return server(vertx, new GrpcServerOptions()); + } + + /** + * Create a blank gRPC server with specified options. + * + * @param options the gRPC server options + * @return the created server + */ + static GrpcServer server(Vertx vertx, GrpcServerOptions options) { + return new GrpcServerImpl(vertx, options); } /** @@ -60,7 +70,7 @@ static GrpcServer server(Vertx vertx) { GrpcServer callHandler(Handler> handler); /** - * Set a service method call handler that handles any call call made to the server for the {@link MethodDescriptor} service method. + * Set a service method call handler that handles any call made to the server for the {@link MethodDescriptor} service method. * * @param handler the service method call handler * @return a reference to this, so the API can be used fluently diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java new file mode 100644 index 00000000..2a45776a --- /dev/null +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.grpc.server; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.annotations.Unstable; +import io.vertx.codegen.json.annotations.JsonGen; +import io.vertx.core.json.JsonObject; + +/** + * Configuration for a {@link GrpcServer}. + */ +@DataObject +@JsonGen(publicConverter = false) +@Unstable +public class GrpcServerOptions { + + /** + * Whether the gRPC-Web protocol should be enabled, by default = true. + */ + public static final boolean DEFAULT_GRPC_WEB_ENABLED = true; + + private boolean grpcWebEnabled; + + /** + * Default options. + */ + public GrpcServerOptions() { + grpcWebEnabled = DEFAULT_GRPC_WEB_ENABLED; + } + + /** + * Copy constructor. + */ + public GrpcServerOptions(GrpcServerOptions other) { + grpcWebEnabled = other.grpcWebEnabled; + } + + /** + * Creates options from JSON. + */ + public GrpcServerOptions(JsonObject json) { + this(); + GrpcServerOptionsConverter.fromJson(json, this); + } + + /** + * @return {@code true} if the gRPC-Web protocol should be enabled, {@code false} otherwise + */ + public boolean isGrpcWebEnabled() { + return grpcWebEnabled; + } + + /** + * Whether the gRPC-Web protocol should be enabled. Defaults to {@code true}. + * + * @param grpcWebEnabled {@code true} if the gRPC-Web protocol should be enabled, {@code false} otherwise + * @return a reference to this, so the API can be used fluently + */ + public GrpcServerOptions setGrpcWebEnabled(boolean grpcWebEnabled) { + this.grpcWebEnabled = grpcWebEnabled; + return this; + } + + /** + * @return a JSON representation of options + */ + public JsonObject toJson() { + JsonObject json = new JsonObject(); + GrpcServerOptionsConverter.toJson(this, json); + return json; + } + + @Override + public String toString() { + return "GrpcServerOptions{" + + "grpcWebEnabled=" + grpcWebEnabled + + '}'; + } +} diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java index f664b3ea..d0dedf53 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -15,30 +15,43 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.grpc.common.GrpcMediaType; import io.vertx.grpc.common.GrpcMessageDecoder; import io.vertx.grpc.common.GrpcMessageEncoder; import io.vertx.grpc.common.impl.GrpcMethodCall; import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServerOptions; import io.vertx.grpc.server.GrpcServerRequest; import java.util.HashMap; import java.util.Map; +import java.util.Objects; + +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; /** * @author Julien Viet */ public class GrpcServerImpl implements GrpcServer { - private final Vertx vertx; + private static final Logger log = LoggerFactory.getLogger(GrpcServer.class); + + private final GrpcServerOptions options; private Handler> requestHandler; private Map> methodCallHandlers = new HashMap<>(); - public GrpcServerImpl(Vertx vertx) { - this.vertx = vertx; + public GrpcServerImpl(Vertx vertx, GrpcServerOptions options) { + this.options = Objects.requireNonNull(options, "options is null"); } @Override public void handle(HttpServerRequest httpRequest) { + if (refuseRequest(httpRequest)) { + return; + } GrpcMethodCall methodCall = new GrpcMethodCall(httpRequest.path()); String fmn = methodCall.fullMethodName(); MethodCallHandler method = methodCallHandlers.get(fmn); @@ -56,6 +69,26 @@ public void handle(HttpServerRequest httpRequest) { } } + private boolean refuseRequest(HttpServerRequest request) { + if (request.version() != HttpVersion.HTTP_2) { + if (!options.isGrpcWebEnabled()) { + log.trace("gRPC-Web is not enabled, sending error 505"); + request.response().setStatusCode(505).end(); + return true; + } + if (!GrpcMediaType.isGrpcWeb(request.headers().get(CONTENT_TYPE))) { + log.trace("gRPC-Web is the only media type supported on HTTP/1.1, sending error 415"); + request.response().setStatusCode(415).end(); + return true; + } + } else if (GrpcMediaType.isGrpcWeb(request.headers().get(CONTENT_TYPE))) { + log.trace("gRPC-Web is not supported on HTTP/2, sending error 415"); + request.response().setStatusCode(415).end(); + return true; + } + return false; + } + private void handle(MethodCallHandler method, HttpServerRequest httpRequest, GrpcMethodCall methodCall) { GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(httpRequest, method.messageDecoder, method.messageEncoder, methodCall); grpcRequest.init(); diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java index 4c2e5720..3e743d06 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -10,35 +10,48 @@ */ package io.vertx.grpc.server.impl; -import io.vertx.core.Future; +import io.netty.buffer.Unpooled; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferInternal; import io.vertx.core.http.HttpConnection; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpVersion; import io.vertx.core.http.impl.HttpServerRequestInternal; -import io.vertx.grpc.common.CodecException; -import io.vertx.grpc.common.GrpcMessageDecoder; -import io.vertx.grpc.common.GrpcMessageEncoder; -import io.vertx.grpc.common.ServiceName; -import io.vertx.grpc.common.impl.GrpcReadStreamBase; +import io.vertx.grpc.common.*; import io.vertx.grpc.common.impl.GrpcMethodCall; +import io.vertx.grpc.common.impl.GrpcReadStreamBase; import io.vertx.grpc.server.GrpcServerRequest; import io.vertx.grpc.server.GrpcServerResponse; +import java.util.Base64; + +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; + /** * @author Julien Viet */ public class GrpcServerRequestImpl extends GrpcReadStreamBase, Req> implements GrpcServerRequest { + private static final Base64.Decoder DECODER = Base64.getDecoder(); + private static final Buffer EMPTY_BUFFER = BufferInternal.buffer(Unpooled.EMPTY_BUFFER); + final HttpServerRequest httpRequest; final GrpcServerResponse response; - private GrpcMethodCall methodCall; + private final GrpcMethodCall methodCall; + private Buffer grpcWebTextBuffer; public GrpcServerRequestImpl(HttpServerRequest httpRequest, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) { super(((HttpServerRequestInternal) httpRequest).context(), httpRequest, httpRequest.headers().get("grpc-encoding"), messageDecoder); this.httpRequest = httpRequest; this.response = new GrpcServerResponseImpl<>(this, httpRequest.response(), messageEncoder); this.methodCall = methodCall; + if (httpRequest.version() != HttpVersion.HTTP_2 && GrpcMediaType.isGrpcWebText(httpRequest.getHeader(CONTENT_TYPE))) { + grpcWebTextBuffer = EMPTY_BUFFER; + } else { + grpcWebTextBuffer = null; + } } public String fullMethodName() { @@ -91,4 +104,44 @@ public GrpcServerResponse response() { public HttpConnection connection() { return httpRequest.connection(); } + + @Override + public void handle(Buffer chunk) { + if (notGrpcWebText()) { + super.handle(chunk); + return; + } + if (grpcWebTextBuffer == EMPTY_BUFFER) { + if ((chunk.length() & 0b11) == 0) { + // Content length is divisible by four, so we decode it immediately + super.handle(Buffer.buffer(DECODER.decode(chunk.getBytes()))); + } else { + grpcWebTextBuffer = chunk.copy(); + } + return; + } + bufferAndDecode(chunk); + } + + private boolean notGrpcWebText() { + return grpcWebTextBuffer == null; + } + + private void bufferAndDecode(Buffer chunk) { + grpcWebTextBuffer.appendBuffer(chunk); + int len = grpcWebTextBuffer.length(); + // Decode base64 content as soon as we have more bytes than a multiple of four. + // We could instead wait for the buffer length to be a multiple of four, + // But then in the worst case we may have to buffer the whole request. + int maxDecodable = len & ~0b11; + if (maxDecodable == len) { + Buffer decoded = Buffer.buffer(DECODER.decode(grpcWebTextBuffer.getBytes())); + grpcWebTextBuffer = EMPTY_BUFFER; + super.handle(decoded); + } else if (maxDecodable > 0) { + Buffer decoded = Buffer.buffer(DECODER.decode(grpcWebTextBuffer.getBytes(0, maxDecodable))); + grpcWebTextBuffer = grpcWebTextBuffer.getBuffer(maxDecodable, len); + super.handle(decoded); + } + } } diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java index dfb5ff00..3c503b20 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License 2.0 which is available at @@ -10,26 +10,24 @@ */ package io.vertx.grpc.server.impl; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpServerResponse; -import io.vertx.grpc.common.CodecException; -import io.vertx.grpc.common.GrpcError; -import io.vertx.grpc.common.GrpcMessage; -import io.vertx.grpc.common.GrpcStatus; -import io.vertx.grpc.common.GrpcMessageDecoder; -import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.core.http.HttpVersion; +import io.vertx.grpc.common.*; import io.vertx.grpc.common.impl.GrpcMessageImpl; import io.vertx.grpc.common.impl.Utils; -import io.vertx.grpc.server.GrpcServerRequest; import io.vertx.grpc.server.GrpcServerResponse; +import java.util.Base64; import java.util.Map; import java.util.Objects; +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; + /** * @author Julien Viet */ @@ -38,6 +36,7 @@ public class GrpcServerResponseImpl implements GrpcServerResponse request; private final HttpServerResponse httpResponse; private final GrpcMessageEncoder encoder; + private final CharSequence contentType; private String encoding; private GrpcStatus status = GrpcStatus.OK; private String statusMessage; @@ -50,6 +49,16 @@ public GrpcServerResponseImpl(GrpcServerRequestImpl request, HttpServ this.request = request; this.httpResponse = httpResponse; this.encoder = encoder; + if (request.httpRequest.version() != HttpVersion.HTTP_2) { + String requestMediaType = request.headers().get(CONTENT_TYPE); + if (GrpcMediaType.isGrpcWebText(requestMediaType)) { + contentType = GrpcMediaType.GRPC_WEB_TEXT_PROTO; + } else { + contentType = GrpcMediaType.GRPC_WEB_PROTO; + } + } else { + contentType = GrpcMediaType.GRPC_PROTO; + } } public GrpcServerResponse status(GrpcStatus status) { @@ -195,15 +204,20 @@ private Future writeMessage(GrpcMessage message, boolean end) { MultiMap responseHeaders = httpResponse.headers(); if (!headersSent) { + if (isGrpcWeb() && !trailersOnly) { + httpResponse.setChunked(true); + } headersSent = true; - if (headers != null && headers.size() > 0) { + if (headers != null && !headers.isEmpty()) { for (Map.Entry header : headers) { responseHeaders.add(header.getKey(), header.getValue()); } } - responseHeaders.set("content-type", "application/grpc"); - responseHeaders.set("grpc-encoding", encoding); - responseHeaders.set("grpc-accept-encoding", "gzip"); + responseHeaders.set("content-type", contentType); + if (!isGrpcWeb()) { + responseHeaders.set("grpc-encoding", encoding); + responseHeaders.set("grpc-accept-encoding", "gzip"); + } } if (end) { @@ -213,11 +227,13 @@ private Future writeMessage(GrpcMessage message, boolean end) { MultiMap responseTrailers; if (trailersOnly) { responseTrailers = httpResponse.headers(); - } else { + } else if (!isGrpcWeb()) { responseTrailers = httpResponse.trailers(); + } else { + responseTrailers = HttpHeaders.headers(); } - if (trailers != null && trailers.size() > 0) { + if (trailers != null && !trailers.isEmpty()) { for (Map.Entry trailer : trailers) { responseTrailers.add(trailer.getKey(), trailer.getValue()); } @@ -234,12 +250,33 @@ private Future writeMessage(GrpcMessage message, boolean end) { responseTrailers.remove("grpc-message"); } if (message != null) { - return httpResponse.end(GrpcMessageImpl.encode(message)); - } else { - return httpResponse.end(); + httpResponse.write(encodeMessage(message)); + } + if (isGrpcWeb() && !trailersOnly) { + Buffer buffer = Buffer.buffer(); + for (Map.Entry trailer : responseTrailers) { + buffer.appendString(trailer.getKey()) + .appendString(":") + .appendString(trailer.getValue()) + .appendString("\r\n"); + } + httpResponse.write(encodeMessage(new GrpcMessageImpl("identity", buffer, true))); } + return httpResponse.end(); } else { - return httpResponse.write(GrpcMessageImpl.encode(message)); + return httpResponse.write(encodeMessage(message)); + } + } + + private Buffer encodeMessage(GrpcMessage message) { + Buffer buffer = GrpcMessageImpl.encode(message); + if (contentType == GrpcMediaType.GRPC_WEB_TEXT_PROTO) { + return Buffer.buffer(Base64.getEncoder().encode(buffer.getBytes())); } + return buffer; + } + + private boolean isGrpcWeb() { + return contentType != GrpcMediaType.GRPC_PROTO; } } diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java index dd26c59d..1c4a2025 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/BinaryServerTest.java @@ -16,17 +16,16 @@ import io.vertx.core.http.HttpHeaders; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import static io.vertx.grpc.common.GrpcMediaType.GRPC_WEB_PROTO; /** * Tests for gRPC-Web server using the binary protocol. */ public class BinaryServerTest extends ServerTestBase { - private static final CharSequence GRPC_WEB_PROTO = HttpHeaders.createOptimized("application/grpc-web+proto"); - @Override protected MultiMap requestHeaders() { - return MultiMap.caseInsensitiveMultiMap() + return HttpHeaders.headers() .add(CONTENT_TYPE, GRPC_WEB_PROTO) .add(USER_AGENT, GRPC_WEB_JAVASCRIPT_0_1) .add(GRPC_WEB, TRUE); diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java index b0372888..a9bc67c2 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/ServerTestBase.java @@ -23,12 +23,15 @@ import io.vertx.grpc.common.GrpcTestBase; import io.vertx.grpc.common.impl.GrpcMessageImpl; import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServerOptions; import io.vertx.grpc.server.GrpcServiceBridge; import io.vertx.grpcweb.GrpcWebTesting.*; import org.junit.Test; import java.util.Arrays; import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.IntStream; import static io.vertx.core.http.HttpHeaders.CONTENT_LENGTH; @@ -53,7 +56,7 @@ public abstract class ServerTestBase extends GrpcTestBase { private static final CharSequence TRAILER_BIN_VALUE = HttpHeaders.createOptimized(String.valueOf(0xfedcba)); private static final CharSequence TRAILER_ERROR_KEY = HttpHeaders.createOptimized("x-error-trailer"); - private static final MultiMap METADATA = MultiMap.caseInsensitiveMultiMap() + private static final MultiMap METADATA = HttpHeaders.headers() .add(HEADER_TEXT_KEY, HEADER_TEXT_VALUE) .add(HEADER_BIN_KEY, HEADER_BIN_VALUE) .add(TRAILER_TEXT_KEY, TRAILER_TEXT_VALUE) @@ -80,13 +83,11 @@ public abstract class ServerTestBase extends GrpcTestBase { @Override public void setUp(TestContext should) { super.setUp(should); - HttpClientOptions clientOptions = new HttpClientOptions().setDefaultPort(port); - httpClient = vertx.createHttpClient(clientOptions); - HttpServerOptions serverOptions = new HttpServerOptions().setPort(port); - GrpcServer grpcServer = GrpcServer.server(vertx); + httpClient = vertx.createHttpClient(new HttpClientOptions().setDefaultPort(port)); + GrpcServer grpcServer = GrpcServer.server(vertx, new GrpcServerOptions().setGrpcWebEnabled(true)); ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(new TestServiceImpl(), new Interceptor()); GrpcServiceBridge.bridge(serviceDefinition).bind(grpcServer); - httpServer = vertx.createHttpServer(serverOptions).requestHandler(grpcServer); + httpServer = vertx.createHttpServer(new HttpServerOptions().setPort(port)).requestHandler(grpcServer); httpServer.listen().onComplete(should.asyncAssertSuccess()); } @@ -200,13 +201,23 @@ public void testSmallPayload(TestContext should) { @Test public void testLargePayloadWithMetadata(TestContext should) { + Random rnd = ThreadLocalRandom.current(); String payload = IntStream.range(0, 16 * 1024).mapToObj(i -> "foobar").collect(joining()); httpClient.request(HttpMethod.POST, TEST_SERVICE + "/UnaryCall").compose(req -> { + req.setChunked(true); req.headers() .addAll(METADATA) .addAll(requestHeaders()); EchoRequest echoRequest = EchoRequest.newBuilder().setPayload(payload).build(); - return req.send(encode(echoRequest)).compose(response -> response.body().map(response)); + Buffer buffer = encode(echoRequest); + // Make sure the server will get blocks of arbitrary size + int length = buffer.length(); + for (int pos = 0, written; pos < length; pos += written) { + written = Math.min(length - pos, 128 + rnd.nextInt(129)); + req.write(buffer.getBuffer(pos, pos + written)); + } + req.end(); + return req.response().compose(response -> response.body().map(response)); }).onComplete(should.asyncAssertSuccess(response -> { should.verify(v -> { diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java index 26957f50..7ebe4821 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/TextServerTest.java @@ -20,6 +20,8 @@ import static io.vertx.core.http.HttpHeaders.ACCEPT; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import static io.vertx.grpc.common.GrpcMediaType.GRPC_WEB_TEXT; +import static io.vertx.grpc.common.GrpcMediaType.GRPC_WEB_TEXT_PROTO; import static org.junit.Assert.assertEquals; /** @@ -27,14 +29,12 @@ */ public class TextServerTest extends ServerTestBase { - private static final CharSequence GRPC_WEB_TEXT = HttpHeaders.createOptimized("application/grpc-web-text"); - private static final CharSequence GRPC_WEB_TEXT_PROTO = HttpHeaders.createOptimized(GRPC_WEB_TEXT + "+proto"); private static final Base64.Encoder ENCODER = Base64.getEncoder(); private static final Base64.Decoder DECODER = Base64.getDecoder(); @Override protected MultiMap requestHeaders() { - return MultiMap.caseInsensitiveMultiMap() + return HttpHeaders.headers() .add(ACCEPT, GRPC_WEB_TEXT) .add(CONTENT_TYPE, GRPC_WEB_TEXT) .add(USER_AGENT, GRPC_WEB_JAVASCRIPT_0_1) From 685ee0aba503a653d62e308025158ad7ce61f8ca Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 22 Feb 2024 17:00:06 +0100 Subject: [PATCH 3/7] Add documentation Including about CORS setup in Vert.x Web. Signed-off-by: Thomas Segismont --- .../src/main/asciidoc/server.adoc | 27 +++++++++++++++++++ .../vertx/grpc/server/GrpcServerOptions.java | 12 +++++++++ 2 files changed, 39 insertions(+) diff --git a/vertx-grpc-server/src/main/asciidoc/server.adoc b/vertx-grpc-server/src/main/asciidoc/server.adoc index 3968af77..2852b58a 100644 --- a/vertx-grpc-server/src/main/asciidoc/server.adoc +++ b/vertx-grpc-server/src/main/asciidoc/server.adoc @@ -49,6 +49,27 @@ router.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request() ---- ==== +==== gRPC-Web protocol + +The Vert.x gRPC Server supports the gRPC-Web protocol by default. + +To disable the gRPC-Web protocol support, configure options with {@link io.vertx.grpc.server.GrpcServerOptions#disableGrpcWeb GrpcServerOptions#disableGrpcWeb} and then create a server with {@link io.vertx.grpc.server.GrpcServer#server(io.vertx.core.Vertx, io.vertx.grpc.server.GrpcServerOptions) GrpcServer#server(vertx, options)}. + +[TIP] +==== +If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. +This can be done with a Vert.x Web router and the CORS handler: + +[source,java] +---- +CorsHandler corsHandler = CorsHandler.create() + .addRelativeOrigin("https://www.mycompany.com") + .allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout")) + .exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message")); +router.route("/com.mycompany.MyService/*").handler(corsHandler); +---- +==== + ==== Request/response Each service method is processed by a handler @@ -86,6 +107,8 @@ A bidi request/response is simply the combination of a streaming request and a s {@link examples.GrpcServerExamples#bidi} ---- +NOTE: The gRPC-Web protocol does not support bidirectional streaming. + === Flow control Request and response are back pressured Vert.x streams. @@ -113,10 +136,14 @@ You can compress response messages by setting the response encoding *prior* befo {@link examples.GrpcServerExamples#responseCompression} ---- +NOTE: Compression is not supported over the gRPC-Web protocol. + === Decompression Decompression is done transparently by the server when the client send encoded requests. +NOTE: Decompression is not supported over the gRPC-Web protocol. + === Stub API The Vert.x gRPC Server can bridge a gRPC service to use with a generated server stub in a more traditional fashion diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java index 2a45776a..f52c0cf5 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java @@ -11,10 +11,13 @@ package io.vertx.grpc.server; import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Unstable; import io.vertx.codegen.json.annotations.JsonGen; import io.vertx.core.json.JsonObject; +import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE; + /** * Configuration for a {@link GrpcServer}. */ @@ -70,6 +73,15 @@ public GrpcServerOptions setGrpcWebEnabled(boolean grpcWebEnabled) { return this; } + /** + * Disable the gRPC-Web protocol support. + */ + @GenIgnore(PERMITTED_TYPE) + public GrpcServerOptions disableGrpcWeb() { + return setGrpcWebEnabled(false); + } + + /** * @return a JSON representation of options */ From 35b180f7659033c42c96cb3009a42e886c009196 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 8 Mar 2024 19:40:06 +0100 Subject: [PATCH 4/7] Run gRPC-Web interop tests as part of the build https://github.com/grpc/grpc-web/blob/a639b4cf2611de2b68883571787083b73cf61f5e/doc/interop-test-descriptions.md Create gRPC-Web prereqs image from a main branch checkout: the prereqs image on Docker Hub might not always be up-to-date (confirmed by gRPC-Web maintainer). Signed-off-by: Thomas Segismont --- .github/workflows/ci-5.x.yml | 19 +++ vertx-grpc-server/pom.xml | 35 +++++ .../grpc/server/web/interop/Interceptor.java | 47 +++++++ .../grpc/server/web/interop/InteropITest.java | 80 +++++++++++ .../server/web/interop/InteropServer.java | 48 +++++++ .../server/web/interop/TestServiceImpl.java | 87 ++++++++++++ vertx-grpc-server/src/test/proto/empty.proto | 41 ++---- .../src/test/proto/messages.proto | 131 +++++++++++------- vertx-grpc-server/src/test/proto/test.proto | 63 ++++----- 9 files changed, 439 insertions(+), 112 deletions(-) create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/Interceptor.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropITest.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropServer.java create mode 100644 vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/TestServiceImpl.java diff --git a/.github/workflows/ci-5.x.yml b/.github/workflows/ci-5.x.yml index b44e094c..6ed3a670 100644 --- a/.github/workflows/ci-5.x.yml +++ b/.github/workflows/ci-5.x.yml @@ -23,6 +23,25 @@ jobs: jdk: ${{ matrix.jdk }} os: ${{ matrix.os }} secrets: inherit + gRPC-Web-Interop: + name: Run gRPC-Web interop tests + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Checkout gRPC-Web + uses: actions/checkout@v2 + with: + repository: grpc/grpc-web + ref: master + path: _grpc-web + - name: Install JDK + uses: actions/setup-java@v2 + with: + java-version: 11 + distribution: temurin + - name: Run tests + run: mvn -s .github/maven-ci-settings.xml -q clean verify -B -pl :vertx-grpc-server -am -Dgrpc-web.repo.path="$GITHUB_WORKSPACE/_grpc-web" Deploy: if: ${{ github.repository_owner == 'eclipse-vertx' && (github.event_name == 'push' || github.event_name == 'schedule') }} needs: CI diff --git a/vertx-grpc-server/pom.xml b/vertx-grpc-server/pom.xml index 09c5835c..0e15efdf 100644 --- a/vertx-grpc-server/pom.xml +++ b/vertx-grpc-server/pom.xml @@ -94,6 +94,12 @@ + + org.testcontainers + testcontainers + 1.17.6 + test + @@ -140,6 +146,35 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${maven.surefire.plugin.version} + + + io/vertx/grpc/server/**/*ITest.java + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.2.5 + + + interop-tests + + integration-test + verify + + + + io/vertx/grpc/server/web/interop/InteropITest.java + + + + + diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/Interceptor.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/Interceptor.java new file mode 100644 index 00000000..dcda2c40 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/Interceptor.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web.interop; + +import io.grpc.*; +import io.grpc.Metadata.Key; + +import java.util.Collections; +import java.util.Set; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; +import static io.grpc.Metadata.BINARY_BYTE_MARSHALLER; + +class Interceptor implements ServerInterceptor { + + private static final Key ECHO_INITIAL_KEY = Key.of("x-grpc-test-echo-initial", ASCII_STRING_MARSHALLER); + private static final Set> HEADERS_KEY_SET = Collections.singleton(ECHO_INITIAL_KEY); + private static final Key ECHO_TRAILING_KEY = Key.of("x-grpc-test-echo-trailing-bin", BINARY_BYTE_MARSHALLER); + private static final Set> TRAILERS_KEY_SET = Collections.singleton(ECHO_TRAILING_KEY); + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata metadata, ServerCallHandler next) { + return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { + + @Override + public void sendHeaders(Metadata headers) { + headers.merge(metadata, HEADERS_KEY_SET); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + trailers.merge(metadata, TRAILERS_KEY_SET); + super.close(status, trailers); + } + }, metadata); + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropITest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropITest.java new file mode 100644 index 00000000..5f691ae9 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropITest.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web.interop; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.Timeout; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.ToStringConsumer; +import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.File; +import java.time.Duration; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; + +@RunWith(VertxUnitRunner.class) +public class InteropITest { + + private static final String GRPC_WEB_REPO_PATH = System.getProperty("grpc-web.repo.path"); + + @Rule + public Timeout rule = Timeout.millis(Duration.ofMinutes(10).toMillis()); + + private Vertx vertx; + + @Before + public void setUp(TestContext should) { + vertx = Vertx.vertx(); + vertx.deployVerticle(new InteropServer()).onComplete(should.asyncAssertSuccess()); + } + + @After + public void tearDown(TestContext should) { + vertx.close().onComplete(should.asyncAssertSuccess()); + } + + @Test + public void interopTests() { + assumeFalse("grpc-web repo path isn't defined", GRPC_WEB_REPO_PATH == null); + + File repoFile = new File(GRPC_WEB_REPO_PATH); + assertTrue("grpc-web repo path doesn't denote a directory", repoFile.isDirectory()); + File dockerfile = new File(repoFile, "net/grpc/gateway/docker/prereqs/Dockerfile"); + assertTrue("Dockerfile doesn't exists or isn't a normal file", dockerfile.isFile()); + + ImageFromDockerfile image = new ImageFromDockerfile() + .withFileFromFile(".", repoFile) + .withFileFromFile("Dockerfile", dockerfile); + + ToStringConsumer logConsumer = new ToStringConsumer(); + try (GenericContainer container = new GenericContainer<>(image)) { + container + .withLogConsumer(logConsumer) + .withNetworkMode("host") + .withCommand("/bin/bash", "/github/grpc-web/scripts/docker-run-interop-tests.sh") + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .start(); + } finally { + System.out.println(logConsumer.toUtf8String()); + } + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropServer.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropServer.java new file mode 100644 index 00000000..4bea1f22 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/InteropServer.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web.interop; + +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServerOptions; +import io.vertx.grpc.server.GrpcServiceBridge; + +/** + * A gRPC-Web server for grpc-web interop tests. + */ +public class InteropServer extends AbstractVerticle { + + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + vertx.deployVerticle(new InteropServer()) + .onFailure(Throwable::printStackTrace) + .onSuccess(v -> System.out.println("Deployed InteropServer")); + } + + @Override + public void start(Promise startPromise) { + GrpcServer grpcServer = GrpcServer.server(vertx, new GrpcServerOptions().setGrpcWebEnabled(true)); + + ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(new TestServiceImpl(vertx), new Interceptor()); + GrpcServiceBridge.bridge(serviceDefinition).bind(grpcServer); + + vertx.createHttpServer() + .requestHandler(grpcServer) + .listen(8080) + .mapEmpty() + .onComplete(startPromise); + } +} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/TestServiceImpl.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/TestServiceImpl.java new file mode 100644 index 00000000..e7f97a37 --- /dev/null +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/web/interop/TestServiceImpl.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.grpc.server.web.interop; + +import com.google.protobuf.ByteString; +import grpc.testing.EmptyOuterClass; +import grpc.testing.Messages; +import grpc.testing.TestServiceGrpc; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.stub.StreamObserver; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { + + private final Vertx vertx; + + TestServiceImpl(Vertx vertx) { + this.vertx = vertx; + } + + @Override + public void emptyCall(EmptyOuterClass.Empty request, StreamObserver responseObserver) { + responseObserver.onNext(EmptyOuterClass.Empty.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public void unaryCall(Messages.SimpleRequest request, StreamObserver responseObserver) { + if (request.hasResponseStatus()) { + Messages.EchoStatus echoStatus = request.getResponseStatus(); + Status status = Status.fromCodeValue(echoStatus.getCode()) + .withDescription(echoStatus.getMessage()); + responseObserver.onError(new StatusException(status)); + return; + } + Messages.Payload payload = Messages.Payload.newBuilder() + .setTypeValue(request.getResponseTypeValue()) + .setBody(ByteString.copyFrom(new byte[request.getResponseSize()])) + .build(); + Messages.SimpleResponse response = Messages.SimpleResponse.newBuilder() + .setPayload(payload) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void streamingOutputCall(Messages.StreamingOutputCallRequest request, StreamObserver responseObserver) { + List> futures = new ArrayList<>(request.getResponseParametersCount()); + long delay = 0; + for (Messages.ResponseParameters parameters : request.getResponseParametersList()) { + delay += Math.max(1, MILLISECONDS.convert(parameters.getIntervalUs(), MICROSECONDS)); + Promise promise = Promise.promise(); + vertx.setTimer(delay, l -> { + Messages.Payload payload = Messages.Payload.newBuilder() + .setType(request.getResponseType()) + .setBody(ByteString.copyFrom(new byte[parameters.getSize()])) + .build(); + Messages.StreamingOutputCallResponse response = Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(payload) + .build(); + responseObserver.onNext(response); + promise.complete(); + }); + futures.add(promise.future()); + } + Future.join(futures).onComplete(v -> responseObserver.onCompleted()); + } +} diff --git a/vertx-grpc-server/src/test/proto/empty.proto b/vertx-grpc-server/src/test/proto/empty.proto index af5591b6..6a0aa88d 100644 --- a/vertx-grpc-server/src/test/proto/empty.proto +++ b/vertx-grpc-server/src/test/proto/empty.proto @@ -1,39 +1,22 @@ -// Copyright 2015, Google Inc. -// All rights reserved. + +// Copyright 2015 gRPC authors. // -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. +// http://www.apache.org/licenses/LICENSE-2.0 // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. -syntax = "proto2"; +syntax = "proto3"; package grpc.testing; -option java_package = "com.google.protobuf"; -option java_outer_classname = "EmptyProtos"; - // An empty message that you can re-use to avoid defining duplicated empty // messages in your project. A typical example is to use it as argument or the // return value of a service API. For instance: diff --git a/vertx-grpc-server/src/test/proto/messages.proto b/vertx-grpc-server/src/test/proto/messages.proto index 5110719e..5993bc6b 100644 --- a/vertx-grpc-server/src/test/proto/messages.proto +++ b/vertx-grpc-server/src/test/proto/messages.proto @@ -1,31 +1,17 @@ -// Copyright 2015, Google Inc. -// All rights reserved. + +// Copyright 2015-2016 gRPC authors. // -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. +// http://www.apache.org/licenses/LICENSE-2.0 // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. // Message definitions to be used by integration test service definitions. @@ -33,26 +19,18 @@ syntax = "proto3"; package grpc.testing; -option java_package = "io.grpc.testing.integration"; +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} // The type of payload that should be returned. enum PayloadType { // Compressable text format. COMPRESSABLE = 0; - - // Uncompressable binary format. - UNCOMPRESSABLE = 1; - - // Randomly chosen from all other formats defined in this enum. - RANDOM = 2; -} - -// Compression algorithms -enum CompressionType { - // No compression - NONE = 0; - GZIP = 1; - DEFLATE = 2; } // A block of data, to simply increase gRPC message size. @@ -70,6 +48,21 @@ message EchoStatus { string message = 2; } +// The type of route that a client took to reach a server w.r.t. gRPCLB. +// The server must fill in "fallback" if it detects that the RPC reached +// the server via the "gRPCLB fallback" path, and "backend" if it detects +// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got +// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly +// how this detection is done is context and server dependent. +enum GrpclbRouteType { + // Server didn't detect the route that a client took to reach it. + GRPCLB_ROUTE_TYPE_UNKNOWN = 0; + // Indicates that a client reached a server via gRPCLB fallback. + GRPCLB_ROUTE_TYPE_FALLBACK = 1; + // Indicates that a client reached a server as a gRPCLB-given backend. + GRPCLB_ROUTE_TYPE_BACKEND = 2; +} + // Unary request. message SimpleRequest { // Desired payload type in the response from the server. @@ -77,7 +70,6 @@ message SimpleRequest { PayloadType response_type = 1; // Desired payload size in the response from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. int32 response_size = 2; // Optional input payload sent along with the request. @@ -89,11 +81,23 @@ message SimpleRequest { // Whether SimpleResponse should include OAuth scope. bool fill_oauth_scope = 5; - // Compression algorithm to be used by the server for the response (stream) - CompressionType response_compression = 6; + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; // Whether server should return a given status EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; + + // Whether SimpleResponse should include server_id. + bool fill_server_id = 9; + + // Whether SimpleResponse should include grpclb_route_type. + bool fill_grpclb_route_type = 10; } // Unary response, as configured by the request. @@ -105,10 +109,15 @@ message SimpleResponse { string username = 2; // OAuth scope. string oauth_scope = 3; -} -message SimpleContext { - string value = 1; + // Server ID. This must be unique among different server instances, + // but the same across all RPC's made to a particular server instance. + string server_id = 4; + // gRPCLB Path. + GrpclbRouteType grpclb_route_type = 5; + + // Server hostname. + string hostname = 6; } // Client-streaming request. @@ -116,6 +125,12 @@ message StreamingInputCallRequest { // Optional input payload sent along with the request. Payload payload = 1; + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + // Not expecting any payload from the response. } @@ -128,12 +143,17 @@ message StreamingInputCallResponse { // Configuration for a particular response. message ResponseParameters { // Desired payload sizes in responses from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. int32 size = 1; // Desired interval between consecutive responses in the response stream in // microseconds. int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; } // Server-streaming request. @@ -150,9 +170,6 @@ message StreamingOutputCallRequest { // Optional input payload sent along with the request. Payload payload = 3; - // Compression algorithm to be used by the server for the response (stream) - CompressionType response_compression = 6; - // Whether server should return a given status EchoStatus response_status = 7; } @@ -176,3 +193,17 @@ message ReconnectInfo { bool passed = 1; repeated int32 backoff_ms = 2; } + +message LoadBalancerStatsRequest { + // Request stats for the next num_rpcs sent by client. + int32 num_rpcs = 1; + // If num_rpcs have not completed within timeout_sec, return partial results. + int32 timeout_sec = 2; +} + +message LoadBalancerStatsResponse { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; +} diff --git a/vertx-grpc-server/src/test/proto/test.proto b/vertx-grpc-server/src/test/proto/test.proto index 7739fa45..95eafe7c 100644 --- a/vertx-grpc-server/src/test/proto/test.proto +++ b/vertx-grpc-server/src/test/proto/test.proto @@ -1,34 +1,21 @@ -// Copyright 2015, Google Inc. -// All rights reserved. + +// Copyright 2015-2016 gRPC authors. // -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. +// http://www.apache.org/licenses/LICENSE-2.0 // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. // An integration test service that covers all the method signature permutations // of unary/streaming requests/responses. + syntax = "proto3"; import "empty.proto"; @@ -36,8 +23,6 @@ import "messages.proto"; package grpc.testing; -option java_package = "io.grpc.testing.integration"; - // A simple service to test the various types of RPCs and experiment with // performance with various types of payload. service TestService { @@ -47,28 +32,33 @@ service TestService { // One request followed by one response. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + // One request followed by one response. Response has cache control + // headers set such that a caching HTTP proxy (such as GFE) can + // satisfy subsequent requests. + rpc CacheableUnaryCall(SimpleRequest) returns (SimpleResponse); + // One request followed by a sequence of responses (streamed download). // The server returns the payload with client desired type and sizes. rpc StreamingOutputCall(StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); + returns (stream StreamingOutputCallResponse); // A sequence of requests followed by one response (streamed upload). // The server returns the aggregated size of client payload as the result. rpc StreamingInputCall(stream StreamingInputCallRequest) - returns (StreamingInputCallResponse); + returns (StreamingInputCallResponse); // A sequence of requests with each request served by the server immediately. // As one request could lead to multiple responses, this interface // demonstrates the idea of full duplexing. rpc FullDuplexCall(stream StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); + returns (stream StreamingOutputCallResponse); // A sequence of requests followed by a sequence of responses. // The server buffers all the client requests and then serves them in order. A // stream of responses are returned to the client when the server starts with // first request. rpc HalfDuplexCall(stream StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); + returns (stream StreamingOutputCallResponse); // The test server will not implement this method. It will be used // to test the behavior when clients call unimplemented methods. @@ -79,11 +69,18 @@ service TestService { // that case. service UnimplementedService { // A call that no server should implement - rpc UnimplementedCall(grpc.testing.Empty) returns(grpc.testing.Empty); + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); } // A service used to control reconnect server. service ReconnectService { - rpc Start(grpc.testing.Empty) returns (grpc.testing.Empty); + rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty); rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo); } + +// A service used to obtain stats for verifying LB behavior. +service LoadBalancerStatsService { + // Gets the backend distribution for RPCs sent by a test client. + rpc GetClientStats(LoadBalancerStatsRequest) + returns (LoadBalancerStatsResponse) {} +} From 608ca7018be267a81cbacceeadd48ca1b5e06052 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 29 Mar 2024 11:48:03 +0100 Subject: [PATCH 5/7] Updates after review Signed-off-by: Thomas Segismont --- .../io/vertx/grpc/common/GrpcMediaType.java | 2 +- .../grpc/common/impl/GrpcMessageImpl.java | 13 +++------ .../src/main/asciidoc/server.adoc | 2 +- .../vertx/grpc/server/GrpcServerOptions.java | 12 -------- .../grpc/server/impl/GrpcServerImpl.java | 19 ++++++------- .../server/impl/GrpcServerRequestImpl.java | 21 +++++++------- .../server/impl/GrpcServerResponseImpl.java | 28 ++++++++++--------- 7 files changed, 41 insertions(+), 56 deletions(-) diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java index a3f1bee3..bd845e9e 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMediaType.java @@ -19,7 +19,7 @@ * The gRPC media types. */ @Unstable -public class GrpcMediaType { +public final class GrpcMediaType { /** * gRPC. diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java index 530060d4..2018059d 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java @@ -21,18 +21,10 @@ public class GrpcMessageImpl implements GrpcMessage { private final String encoding; private final Buffer payload; - private final boolean trailer; public GrpcMessageImpl(String encoding, Buffer payload) { this.encoding = encoding; this.payload = payload; - trailer = false; - } - - public GrpcMessageImpl(String encoding, Buffer payload, boolean trailer) { - this.encoding = encoding; - this.payload = payload; - this.trailer = trailer; } @Override @@ -46,10 +38,13 @@ public Buffer payload() { } public static Buffer encode(GrpcMessage message) { + return encode(message, false); + } + + public static BufferInternal encode(GrpcMessage message, boolean trailer) { ByteBuf bbuf = ((BufferInternal)message.payload()).getByteBuf(); int len = bbuf.readableBytes(); boolean compressed = !message.encoding().equals("identity"); - boolean trailer = message instanceof GrpcMessageImpl && ((GrpcMessageImpl) message).trailer; ByteBuf prefix = Unpooled.buffer(5, 5); prefix.writeByte((trailer ? 0x80 : 0x00) | (compressed ? 0x01 : 0x00)); prefix.writeInt(len); // Length diff --git a/vertx-grpc-server/src/main/asciidoc/server.adoc b/vertx-grpc-server/src/main/asciidoc/server.adoc index 2852b58a..da076be2 100644 --- a/vertx-grpc-server/src/main/asciidoc/server.adoc +++ b/vertx-grpc-server/src/main/asciidoc/server.adoc @@ -53,7 +53,7 @@ router.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request() The Vert.x gRPC Server supports the gRPC-Web protocol by default. -To disable the gRPC-Web protocol support, configure options with {@link io.vertx.grpc.server.GrpcServerOptions#disableGrpcWeb GrpcServerOptions#disableGrpcWeb} and then create a server with {@link io.vertx.grpc.server.GrpcServer#server(io.vertx.core.Vertx, io.vertx.grpc.server.GrpcServerOptions) GrpcServer#server(vertx, options)}. +To disable the gRPC-Web protocol support, configure options with {@link io.vertx.grpc.server.GrpcServerOptions#setGrpcWebEnabled GrpcServerOptions#setGrpcWebEnabled(false)} and then create a server with {@link io.vertx.grpc.server.GrpcServer#server(io.vertx.core.Vertx, io.vertx.grpc.server.GrpcServerOptions) GrpcServer#server(vertx, options)}. [TIP] ==== diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java index f52c0cf5..2a45776a 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java @@ -11,13 +11,10 @@ package io.vertx.grpc.server; import io.vertx.codegen.annotations.DataObject; -import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Unstable; import io.vertx.codegen.json.annotations.JsonGen; import io.vertx.core.json.JsonObject; -import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE; - /** * Configuration for a {@link GrpcServer}. */ @@ -73,15 +70,6 @@ public GrpcServerOptions setGrpcWebEnabled(boolean grpcWebEnabled) { return this; } - /** - * Disable the gRPC-Web protocol support. - */ - @GenIgnore(PERMITTED_TYPE) - public GrpcServerOptions disableGrpcWeb() { - return setGrpcWebEnabled(false); - } - - /** * @return a JSON representation of options */ diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java index d0dedf53..a1503be7 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java @@ -44,12 +44,14 @@ public class GrpcServerImpl implements GrpcServer { private Map> methodCallHandlers = new HashMap<>(); public GrpcServerImpl(Vertx vertx, GrpcServerOptions options) { - this.options = Objects.requireNonNull(options, "options is null"); + this.options = new GrpcServerOptions(Objects.requireNonNull(options, "options is null")); } @Override public void handle(HttpServerRequest httpRequest) { - if (refuseRequest(httpRequest)) { + int errorCode = refuseRequest(httpRequest); + if (errorCode > 0) { + httpRequest.response().setStatusCode(errorCode).end(); return; } GrpcMethodCall methodCall = new GrpcMethodCall(httpRequest.path()); @@ -69,24 +71,21 @@ public void handle(HttpServerRequest httpRequest) { } } - private boolean refuseRequest(HttpServerRequest request) { + private int refuseRequest(HttpServerRequest request) { if (request.version() != HttpVersion.HTTP_2) { if (!options.isGrpcWebEnabled()) { log.trace("gRPC-Web is not enabled, sending error 505"); - request.response().setStatusCode(505).end(); - return true; + return 505; } if (!GrpcMediaType.isGrpcWeb(request.headers().get(CONTENT_TYPE))) { log.trace("gRPC-Web is the only media type supported on HTTP/1.1, sending error 415"); - request.response().setStatusCode(415).end(); - return true; + return 415; } } else if (GrpcMediaType.isGrpcWeb(request.headers().get(CONTENT_TYPE))) { log.trace("gRPC-Web is not supported on HTTP/2, sending error 415"); - request.response().setStatusCode(415).end(); - return true; + return 415; } - return false; + return -1; } private void handle(MethodCallHandler method, HttpServerRequest httpRequest, GrpcMethodCall methodCall) { diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java index 3e743d06..c90b9019 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java @@ -10,7 +10,9 @@ */ package io.vertx.grpc.server.impl; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.handler.codec.base64.Base64; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; @@ -25,8 +27,6 @@ import io.vertx.grpc.server.GrpcServerRequest; import io.vertx.grpc.server.GrpcServerResponse; -import java.util.Base64; - import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; /** @@ -34,13 +34,12 @@ */ public class GrpcServerRequestImpl extends GrpcReadStreamBase, Req> implements GrpcServerRequest { - private static final Base64.Decoder DECODER = Base64.getDecoder(); - private static final Buffer EMPTY_BUFFER = BufferInternal.buffer(Unpooled.EMPTY_BUFFER); + private static final BufferInternal EMPTY_BUFFER = BufferInternal.buffer(Unpooled.EMPTY_BUFFER); final HttpServerRequest httpRequest; final GrpcServerResponse response; private final GrpcMethodCall methodCall; - private Buffer grpcWebTextBuffer; + private BufferInternal grpcWebTextBuffer; public GrpcServerRequestImpl(HttpServerRequest httpRequest, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) { super(((HttpServerRequestInternal) httpRequest).context(), httpRequest, httpRequest.headers().get("grpc-encoding"), messageDecoder); @@ -112,11 +111,12 @@ public void handle(Buffer chunk) { return; } if (grpcWebTextBuffer == EMPTY_BUFFER) { + ByteBuf bbuf = ((BufferInternal) chunk).getByteBuf(); if ((chunk.length() & 0b11) == 0) { // Content length is divisible by four, so we decode it immediately - super.handle(Buffer.buffer(DECODER.decode(chunk.getBytes()))); + super.handle(BufferInternal.buffer(Base64.decode(bbuf))); } else { - grpcWebTextBuffer = chunk.copy(); + grpcWebTextBuffer = BufferInternal.buffer(bbuf.copy()); } return; } @@ -135,12 +135,13 @@ private void bufferAndDecode(Buffer chunk) { // But then in the worst case we may have to buffer the whole request. int maxDecodable = len & ~0b11; if (maxDecodable == len) { - Buffer decoded = Buffer.buffer(DECODER.decode(grpcWebTextBuffer.getBytes())); + BufferInternal decoded = BufferInternal.buffer(Base64.decode(grpcWebTextBuffer.getByteBuf())); grpcWebTextBuffer = EMPTY_BUFFER; super.handle(decoded); } else if (maxDecodable > 0) { - Buffer decoded = Buffer.buffer(DECODER.decode(grpcWebTextBuffer.getBytes(0, maxDecodable))); - grpcWebTextBuffer = grpcWebTextBuffer.getBuffer(maxDecodable, len); + ByteBuf bbuf = grpcWebTextBuffer.getByteBuf(); + BufferInternal decoded = BufferInternal.buffer(Base64.decode(bbuf, 0, maxDecodable)); + grpcWebTextBuffer = BufferInternal.buffer(bbuf.copy(maxDecodable, len - maxDecodable)); super.handle(decoded); } } diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java index 3c503b20..37e14f47 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java @@ -10,10 +10,12 @@ */ package io.vertx.grpc.server.impl; +import io.netty.handler.codec.base64.Base64; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferInternal; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.http.HttpVersion; @@ -22,11 +24,11 @@ import io.vertx.grpc.common.impl.Utils; import io.vertx.grpc.server.GrpcServerResponse; -import java.util.Base64; import java.util.Map; import java.util.Objects; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; +import static io.vertx.grpc.common.GrpcMediaType.*; /** * @author Julien Viet @@ -51,13 +53,13 @@ public GrpcServerResponseImpl(GrpcServerRequestImpl request, HttpServ this.encoder = encoder; if (request.httpRequest.version() != HttpVersion.HTTP_2) { String requestMediaType = request.headers().get(CONTENT_TYPE); - if (GrpcMediaType.isGrpcWebText(requestMediaType)) { - contentType = GrpcMediaType.GRPC_WEB_TEXT_PROTO; + if (isGrpcWebText(requestMediaType)) { + contentType = GRPC_WEB_TEXT_PROTO; } else { - contentType = GrpcMediaType.GRPC_WEB_PROTO; + contentType = GRPC_WEB_PROTO; } } else { - contentType = GrpcMediaType.GRPC_PROTO; + contentType = GRPC_PROTO; } } @@ -250,7 +252,7 @@ private Future writeMessage(GrpcMessage message, boolean end) { responseTrailers.remove("grpc-message"); } if (message != null) { - httpResponse.write(encodeMessage(message)); + httpResponse.write(encodeMessage(message, false)); } if (isGrpcWeb() && !trailersOnly) { Buffer buffer = Buffer.buffer(); @@ -260,23 +262,23 @@ private Future writeMessage(GrpcMessage message, boolean end) { .appendString(trailer.getValue()) .appendString("\r\n"); } - httpResponse.write(encodeMessage(new GrpcMessageImpl("identity", buffer, true))); + httpResponse.write(encodeMessage(new GrpcMessageImpl("identity", buffer), true)); } return httpResponse.end(); } else { - return httpResponse.write(encodeMessage(message)); + return httpResponse.write(encodeMessage(message, false)); } } - private Buffer encodeMessage(GrpcMessage message) { - Buffer buffer = GrpcMessageImpl.encode(message); - if (contentType == GrpcMediaType.GRPC_WEB_TEXT_PROTO) { - return Buffer.buffer(Base64.getEncoder().encode(buffer.getBytes())); + private Buffer encodeMessage(GrpcMessage message, boolean trailer) { + BufferInternal buffer = GrpcMessageImpl.encode(message, trailer); + if (GRPC_WEB_TEXT_PROTO.equals(contentType)) { + return BufferInternal.buffer(Base64.encode(buffer.getByteBuf(), false)); } return buffer; } private boolean isGrpcWeb() { - return contentType != GrpcMediaType.GRPC_PROTO; + return !GRPC_PROTO.equals(contentType); } } From 33c2596d101bbd2f82f43a7d029436a597afc561 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 2 Apr 2024 14:34:19 +0200 Subject: [PATCH 6/7] gRPC-Web over HTTP/2 should be allowed The browser does not expose HTTP protocol details so, even if the client and server communicate over HTTP/2, standard gRPC cannot be used and gRPC-Web is mandatory. Signed-off-by: Thomas Segismont --- .../main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java index a1503be7..296ee372 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java @@ -81,9 +81,6 @@ private int refuseRequest(HttpServerRequest request) { log.trace("gRPC-Web is the only media type supported on HTTP/1.1, sending error 415"); return 415; } - } else if (GrpcMediaType.isGrpcWeb(request.headers().get(CONTENT_TYPE))) { - log.trace("gRPC-Web is not supported on HTTP/2, sending error 415"); - return 415; } return -1; } From 2ae4745984da3923a810d0851c5c3bbbe29c7e28 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 5 Apr 2024 17:34:07 +0200 Subject: [PATCH 7/7] Updates after review Signed-off-by: Thomas Segismont --- .../java/io/vertx/grpc/common/impl/GrpcMessageImpl.java | 7 +++++++ .../io/vertx/grpc/server/impl/GrpcServerResponseImpl.java | 6 ++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java index 2018059d..040d20a6 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java @@ -41,6 +41,13 @@ public static Buffer encode(GrpcMessage message) { return encode(message, false); } + /** + * Encode a {@link GrpcMessage}. + * + * @param message the message + * @param trailer whether this message is a gRPC-Web trailer + * @return the encoded message + */ public static BufferInternal encode(GrpcMessage message, boolean trailer) { ByteBuf bbuf = ((BufferInternal)message.payload()).getByteBuf(); int len = bbuf.readableBytes(); diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java index 37e14f47..47170866 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java @@ -206,9 +206,7 @@ private Future writeMessage(GrpcMessage message, boolean end) { MultiMap responseHeaders = httpResponse.headers(); if (!headersSent) { - if (isGrpcWeb() && !trailersOnly) { - httpResponse.setChunked(true); - } + httpResponse.setChunked(isGrpcWeb() && !trailersOnly); headersSent = true; if (headers != null && !headers.isEmpty()) { for (Map.Entry header : headers) { @@ -258,7 +256,7 @@ private Future writeMessage(GrpcMessage message, boolean end) { Buffer buffer = Buffer.buffer(); for (Map.Entry trailer : responseTrailers) { buffer.appendString(trailer.getKey()) - .appendString(":") + .appendByte((byte) ':') .appendString(trailer.getValue()) .appendString("\r\n"); }