Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy functionality #657

Merged
merged 13 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 3.2.4-wip

* Forward internal `GrpcError` on when throwing while sending a request.
* Add support for proxies, see [#33](https://github.com/grpc/grpc-dart/issues/33).

## 3.2.3

Expand Down
119 changes: 98 additions & 21 deletions lib/src/client/http2_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:grpc/src/client/proxy.dart';
import 'package:http2/transport.dart';

import '../shared/codec.dart';
Expand Down Expand Up @@ -61,7 +63,7 @@ class Http2ClientConnection implements connection.ClientConnection {
ClientKeepAlive? keepAliveManager;

Http2ClientConnection(Object host, int port, this.options)
: _transportConnector = _SocketTransportConnector(host, port, options);
: _transportConnector = SocketTransportConnector(host, port, options);

Http2ClientConnection.fromClientTransportConnector(
this._transportConnector, this.options);
Expand Down Expand Up @@ -351,39 +353,68 @@ class Http2ClientConnection implements connection.ClientConnection {
}
}

class _SocketTransportConnector implements ClientTransportConnector {
class SocketTransportConnector implements ClientTransportConnector {
/// Either [InternetAddress] or [String].
final Object _host;
final int _port;
final ChannelOptions _options;
late Socket _socket; // ignore: close_sinks
late Socket socket;

_SocketTransportConnector(this._host, this._port, this._options)
Proxy? get proxy => _options.proxy;
Object get host => proxy == null ? _host : proxy!.host;
int get port => proxy == null ? _port : proxy!.port;

SocketTransportConnector(this._host, this._port, this._options)
: assert(_host is InternetAddress || _host is String);

@override
Future<ClientTransportConnection> connect() async {
final securityContext = _options.credentials.securityContext;
_socket =
await Socket.connect(_host, _port, timeout: _options.connectTimeout);
var incoming = await connectImpl(proxy);

// Don't wait for io buffers to fill up before sending requests.
if (_socket.address.type != InternetAddressType.unix) {
_socket.setOption(SocketOption.tcpNoDelay, true);
if (socket.address.type != InternetAddressType.unix) {
socket.setOption(SocketOption.tcpNoDelay, true);
}
if (securityContext != null) {
// Todo(sigurdm): We want to pass supportedProtocols: ['h2'].
// http://dartbug.com/37950
_socket = await SecureSocket.secure(_socket,
// This is not really the host, but the authority to verify the TLC
// connection against.
//
// We don't use `this.authority` here, as that includes the port.
host: _options.credentials.authority ?? _host,
context: securityContext,
onBadCertificate: _validateBadCertificate);
socket = await SecureSocket.secure(
socket,
// This is not really the host, but the authority to verify the TLC
// connection against.
//
// We don't use `this.authority` here, as that includes the port.
host: _options.credentials.authority ?? host,
context: securityContext,
onBadCertificate: _validateBadCertificate,
);
incoming = socket;
}
return ClientTransportConnection.viaStreams(incoming, socket);
}

Future<Stream<List<int>>> connectImpl(Proxy? proxy) async {
socket = await initSocket(host, port);
if (proxy == null) {
return socket;
}
return await connectToProxy(proxy);
}

return ClientTransportConnection.viaSocket(_socket);
Future<Socket> initSocket(Object host, int port) async {
return await Socket.connect(host, port, timeout: _options.connectTimeout);
}

void _sendConnect(Map<String, String> headers) {
const linebreak = '\r\n';
socket.write('CONNECT $_host:$_port HTTP/1.1');
socket.write(linebreak);
headers.forEach((key, value) {
socket.write('$key: $value');
socket.write(linebreak);
});
socket.write(linebreak);
}

@override
Expand All @@ -409,14 +440,14 @@ class _SocketTransportConnector implements ClientTransportConnector {

@override
Future get done {
ArgumentError.checkNotNull(_socket);
return _socket.done;
ArgumentError.checkNotNull(socket);
return socket.done;
}

@override
void shutdown() {
ArgumentError.checkNotNull(_socket);
_socket.destroy();
ArgumentError.checkNotNull(socket);
socket.destroy();
}

bool _validateBadCertificate(X509Certificate certificate) {
Expand All @@ -426,6 +457,52 @@ class _SocketTransportConnector implements ClientTransportConnector {
if (validator == null) return false;
return validator(certificate, authority);
}

Future<Stream<List<int>>> connectToProxy(Proxy proxy) async {
final headers = {'Host': '$_host:$_port'};
if (proxy.isAuthenticated) {
// If the proxy configuration contains user information use that
// for proxy basic authorization.
final authStr = '${proxy.username}:${proxy.password}';
final auth = base64Encode(utf8.encode(authStr));
headers[HttpHeaders.proxyAuthorizationHeader] = 'Basic $auth';
}
final completer = Completer<void>();

/// Routes the events through after connection to the proxy has been
/// established.
final intermediate = StreamController<List<int>>();

/// Route events after the successfull connect to the `intermediate`.
socket.listen(
(event) {
if (completer.isCompleted) {
intermediate.sink.add(event);
} else {
_waitForResponse(event, completer);
}
},
onDone: intermediate.close,
onError: intermediate.addError,
);

_sendConnect(headers);
await completer.future;
return intermediate.stream;
}

/// Wait for the response to the `CONNECT` request, which should be an
/// acknowledgement with a 200 status code.
void _waitForResponse(Uint8List chunk, Completer<void> completer) {
final response = ascii.decode(chunk);
print(response);
if (response.startsWith('HTTP/1.1 200')) {
completer.complete();
} else {
throw TransportException(
'Error establishing proxy connection: $response');
}
}
}

class _ShutdownException implements Exception {}
3 changes: 3 additions & 0 deletions lib/src/client/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'dart:math';

import '../shared/codec_registry.dart';
import 'client_keepalive.dart';
import 'proxy.dart';
import 'transport/http2_credentials.dart';

const defaultIdleTimeout = Duration(minutes: 5);
Expand Down Expand Up @@ -59,6 +60,7 @@ class ChannelOptions {
final BackoffStrategy backoffStrategy;
final String userAgent;
final ClientKeepAliveOptions keepAlive;
final Proxy? proxy;

const ChannelOptions({
this.credentials = const ChannelCredentials.secure(),
Expand All @@ -69,5 +71,6 @@ class ChannelOptions {
this.connectionTimeout = defaultConnectionTimeOut,
this.codecRegistry,
this.keepAlive = const ClientKeepAliveOptions(),
this.proxy,
});
}
15 changes: 15 additions & 0 deletions lib/src/client/proxy.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class Proxy {
mosuem marked this conversation as resolved.
Show resolved Hide resolved
final String host;
final int port;
final String? username;
final String? password;

const Proxy({
required this.host,
required this.port,
this.username,
this.password,
});

bool get isAuthenticated => username != null;
}
75 changes: 75 additions & 0 deletions test/proxy_secure_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
@TestOn('vm')
import 'dart:async';
import 'dart:io';

import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/proxy.dart';
import 'package:test/test.dart';

import 'src/generated/echo.pbgrpc.dart';

void main() {
late Server server;
late EchoServiceClient fakeClient;
late ClientChannel fakeChannel;

setUp(() async {
server = Server.create(services: [FakeEchoService()]);
await server.serve(
address: 'localhost',
port: 8888,
security: ServerTlsCredentials(
certificate: File('test/data/localhost.crt').readAsBytesSync(),
privateKey: File('test/data/localhost.key').readAsBytesSync(),
),
);
final proxy = Proxy(host: 'localhost', port: 8080);
final proxyCAName = '/CN=mitmproxy/O=mitmproxy';

fakeChannel = ClientChannel(
'localhost',
port: server.port!,
options: ChannelOptions(
credentials: ChannelCredentials.secure(
certificates: File('test/data/localhost.crt').readAsBytesSync(),
authority: 'localhost',
onBadCertificate: (certificate, host) {
/// Workaround to avoid having to add the proxy to our list of
/// trusted CAs.
return certificate.issuer == proxyCAName;
},
),
proxy: proxy,
),
);
fakeClient = EchoServiceClient(fakeChannel);
});

tearDown(() async {
await fakeChannel.shutdown();
await server.shutdown();
});

test(
'Sending and receiving over secure proxy works',
() async {
final echoRequest = EchoRequest(message: 'blablablubb');
final echoResponse = await fakeClient.echo(echoRequest);
expect(echoResponse.message, 'blibliblabb');
},
skip: 'Run this test iff you have a proxy running.',
);
}

class FakeEchoService extends EchoServiceBase {
@override
Future<EchoResponse> echo(ServiceCall call, EchoRequest request) async {
expect(request.message, 'blablablubb');
return EchoResponse(message: 'blibliblabb');
}

@override
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
ServiceCall call, ServerStreamingEchoRequest request) =>
throw UnimplementedError();
}
57 changes: 57 additions & 0 deletions test/proxy_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
@TestOn('vm')
import 'dart:async';

import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/proxy.dart';
import 'package:test/test.dart';

import 'src/generated/echo.pbgrpc.dart';

void main() {
late Server server;
late EchoServiceClient fakeClient;
late ClientChannel fakeChannel;

setUp(() async {
server = Server.create(services: [FakeEchoService()]);
await server.serve(address: 'localhost', port: 8888);

final proxy = Proxy(host: 'localhost', port: 8080);

fakeChannel = ClientChannel(
'localhost',
port: server.port!,
options: ChannelOptions(
credentials: ChannelCredentials.insecure(),
proxy: proxy,
),
);
fakeClient = EchoServiceClient(fakeChannel);
});

tearDown(() async {
await fakeChannel.shutdown();
await server.shutdown();
});

test(
'Sending and receiving over proxy works',
() async {
final echoRequest = EchoRequest(message: 'blablablubb');
final echoResponse = await fakeClient.echo(echoRequest);
expect(echoResponse.message, 'blibliblabb');
},
skip: 'Run this test iff you have a proxy running.',
);
}

class FakeEchoService extends EchoServiceBase {
@override
Future<EchoResponse> echo(ServiceCall call, EchoRequest request) async =>
EchoResponse(message: 'blibliblabb');

@override
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
ServiceCall call, ServerStreamingEchoRequest request) =>
throw UnimplementedError();
}
4 changes: 4 additions & 0 deletions test/src/client_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import 'dart:convert';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/channel.dart' as base;
import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/client/proxy.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:http2/transport.dart';
import 'package:mockito/annotations.dart';
Expand Down Expand Up @@ -79,6 +80,9 @@ class FakeChannelOptions implements ChannelOptions {

@override
ClientKeepAliveOptions get keepAlive => const ClientKeepAliveOptions();

@override
Proxy? get proxy => null;
}

class FakeChannel extends ClientChannel {
Expand Down
Loading