Skip to content

Commit

Permalink
Client side load balancing implementation and documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Aug 17, 2024
1 parent 0b945c7 commit 605ef3b
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.Address;
import io.vertx.grpc.client.impl.GrpcClientBuilderImpl;
import io.vertx.grpc.client.impl.GrpcClientImpl;
import io.vertx.grpc.common.ServiceMethod;

Expand All @@ -33,14 +34,24 @@
@VertxGen
public interface GrpcClient {

/**
* Provide a builder for {@link GrpcClient}, it can be used to configure advanced
* client settings like a load balancer or an address resolver.
* <p>
* Example usage: {@code GrpcClient client = GrpcClient.builder(vertx).with(options)...build()}
*/
static GrpcClientBuilder<GrpcClient> builder(Vertx vertx) {
return new GrpcClientBuilderImpl<>(vertx);
}

/**
* Create a client.
*
* @param vertx the vertx instance
* @return the created client
*/
static GrpcClient client(Vertx vertx) {
return new GrpcClientImpl(vertx);
return builder(vertx).build();
}

/**
Expand All @@ -50,7 +61,7 @@ static GrpcClient client(Vertx vertx) {
* @return the created client
*/
static GrpcClient client(Vertx vertx, GrpcClientOptions options) {
return new GrpcClientImpl(vertx, options, new HttpClientOptions().setHttp2ClearTextUpgrade(false));
return builder(vertx).with(options).build();
}

/**
Expand All @@ -62,7 +73,7 @@ static GrpcClient client(Vertx vertx, GrpcClientOptions options) {
* @return the created client
*/
static GrpcClient client(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientOptions httpOptions) {
return new GrpcClientImpl(vertx, grpcOptions, httpOptions);
return builder(vertx).with(grpcOptions).with(httpOptions).build();
}

/**
Expand All @@ -73,7 +84,7 @@ static GrpcClient client(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientO
* @return the created client
*/
static GrpcClient client(Vertx vertx, HttpClientOptions options) {
return new GrpcClientImpl(vertx, new GrpcClientOptions(), options);
return builder(vertx).with(options).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.client;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.endpoint.LoadBalancer;

/**
* A builder for {@link GrpcClient}.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@VertxGen
public interface GrpcClientBuilder<C> {

/**
* Configure the client options.
* @param options the client options
* @return a reference to this, so the API can be used fluently
*/
@Fluent
GrpcClientBuilder<C> with(GrpcClientOptions options);

/**
* Configure the client HTTP transport options.
* @param transportOptions the client transport options
* @return a reference to this, so the API can be used fluently
*/
@Fluent
GrpcClientBuilder<C> with(HttpClientOptions transportOptions);

/**
* Configure the client to use a specific address resolver.
*
* @param resolver the address resolver
*/
@GenIgnore({"permitted-type"})
GrpcClientBuilder<C> withAddressResolver(AddressResolver resolver);

/**
* Configure the client to use a load balancer.
*
* @param loadBalancer the load balancer
*/
@GenIgnore({"permitted-type"})
GrpcClientBuilder<C> withLoadBalancer(LoadBalancer loadBalancer);

/**
* Build and return the client.
* @return the client as configured by this builder
*/
C build();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.client.impl;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientBuilder;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientBuilder;
import io.vertx.grpc.client.GrpcClientOptions;

/**
* Implementation of {@link GrpcClientBuilder}.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class GrpcClientBuilderImpl<C extends GrpcClient> implements GrpcClientBuilder<C> {

private final Vertx vertx;
private GrpcClientOptions options;
private HttpClientOptions transportOptions;
private AddressResolver addressResolver;
private LoadBalancer loadBalancer;

public GrpcClientBuilderImpl(Vertx vertx) {
this.vertx = vertx;
}

@Override
public GrpcClientBuilderImpl<C> with(GrpcClientOptions options) {
this.options = options == null ? null : new GrpcClientOptions(options);
return this;
}

@Override
public GrpcClientBuilderImpl<C> with(HttpClientOptions transportOptions) {
this.transportOptions = transportOptions == null ? null : new HttpClientOptions(transportOptions);
return this;
}

@Override
public GrpcClientBuilderImpl<C> withAddressResolver(AddressResolver resolver) {
this.addressResolver = resolver;
return this;
}

@Override
public GrpcClientBuilderImpl<C> withLoadBalancer(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
return this;
}

@Override
public C build() {
HttpClientOptions transportOptions = this.transportOptions;
if (transportOptions == null) {
transportOptions = new HttpClientOptions().setHttp2ClearTextUpgrade(false);
}
transportOptions = transportOptions.setProtocolVersion(HttpVersion.HTTP_2);
HttpClientBuilder transportBuilder = vertx
.httpClientBuilder()
.with(transportOptions);
if (loadBalancer != null) {
transportBuilder.withLoadBalancer(loadBalancer);
}
if (addressResolver != null) {
transportBuilder.withAddressResolver(addressResolver);
}
GrpcClientOptions options = this.options;
if (options == null) {
options = new GrpcClientOptions();
}
return create(vertx, options, transportBuilder.build());
}

protected C create(Vertx vertx, GrpcClientOptions options, HttpClient transport) {
GrpcClient client = new GrpcClientImpl(vertx, options, transport, true);
return (C) client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,11 @@ public class GrpcClientImpl implements GrpcClient {
private final int timeout;
private final TimeUnit timeoutUnit;

public GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientOptions httpOptions) {
this(vertx, grpcOptions, vertx.createHttpClient(new HttpClientOptions(httpOptions).setProtocolVersion(HttpVersion.HTTP_2)), true);
}

public GrpcClientImpl(Vertx vertx) {
this(vertx, new GrpcClientOptions(), new HttpClientOptions().setHttp2ClearTextUpgrade(false));
}

public GrpcClientImpl(Vertx vertx, HttpClient client) {
this(vertx, new GrpcClientOptions(), client, false);
}

private GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient client, boolean close) {
protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient client, boolean close) {
this.vertx = vertx;
this.client = client;
this.scheduleDeadlineAutomatically = grpcOptions.getScheduleDeadlineAutomatically();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.tests.client;

import io.grpc.*;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.TestContext;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.common.GrpcReadStream;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ClientSideLoadBalancingTest extends ClientTestBase {

private GrpcClient client;

@Test
public void testRoundRobin(TestContext should) throws Exception {

int numServers = 3;
List<SocketAddress> endpoints = new ArrayList<>();

for (int i = 0;i < numServers;i++) {
int idx = i;
GreeterGrpc.GreeterImplBase called = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> plainResponseObserver) {
ServerCallStreamObserver<HelloReply> responseObserver =
(ServerCallStreamObserver<HelloReply>) plainResponseObserver;
responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName() + idx).build());
responseObserver.onCompleted();
}
};
startServer(called, ServerBuilder.forPort(port + i));
endpoints.add(SocketAddress.inetSocketAddress(port + i, "localhost"));
}

client = GrpcClient.builder(vertx)
.withAddressResolver(AddressResolver.mappingResolver(address -> endpoints))
.build();

int numRequests = 10;

List<String> replies = new ArrayList<>();
for (int i = 0;i < numRequests;i++) {
HelloReply reply = client.request(SocketAddress.inetSocketAddress(port, "localhost"), GREETER_SAY_HELLO)
.compose(req -> req
.send(HelloRequest.newBuilder().setName("Julien").build())
.compose(GrpcReadStream::last)).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
replies.add(reply.getMessage());
}

List<String> expected = IntStream
.range(0, numRequests).mapToObj(idx -> "Hello Julien" + (idx % numServers))
.collect(Collectors.toList());
should.assertEquals(expected, replies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static io.vertx.grpc.common.GrpcMessageDecoder.decoder;
import static io.vertx.grpc.common.GrpcMessageEncoder.encoder;
Expand Down Expand Up @@ -53,46 +55,47 @@ public abstract class ClientTestBase extends GrpcTestBase {
public static final ServiceMethod<Item, Item> STREAMING_PIPE = ServiceMethod.client(STREAMING, "Pipe", ITEM_ENC, ITEM_DEC);

/* The port on which the server should run */
protected Server server;
private List<Server> servers = new ArrayList<>();

@After
public void tearDown(TestContext should) {
stopServer(false);
stopServers(false);
super.tearDown(should);
}

void startServer(BindableService service) throws IOException {
startServer(service, ServerBuilder.forPort(port));
}

void stopServer(boolean now) {
Server s = server;
if (s != null) {
server = null;
void stopServers(boolean now) {

List<Server> list = new ArrayList<>(servers);
servers.clear();
list.forEach(server -> {
if (now) {
s.shutdownNow();
server.shutdownNow();
} else {
s.shutdown();
server.shutdown();
}
}
});
}

void startServer(BindableService service, ServerBuilder builder) throws IOException {
server = builder
.addService(service)
.build()
.start();
void startServer(BindableService service, ServerBuilder<?> builder) throws IOException {
servers.add(builder
.addService(service)
.build()
.start());
}


void startServer(ServerServiceDefinition service) throws IOException {
startServer(service, ServerBuilder.forPort(port));
}

void startServer(ServerServiceDefinition service, ServerBuilder builder) throws IOException {
server = builder
void startServer(ServerServiceDefinition service, ServerBuilder<?> builder) throws IOException {
servers.add(builder
.addService(service)
.build()
.start();
.start());
}
}
Loading

0 comments on commit 605ef3b

Please sign in to comment.