Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkgs/ok_http: Close and remove all idle connections from the resource pool on response #1223

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkgs/ok_http/example/integration_test/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ Future<void> testConformance() async {
testResponseBody(OkHttpClient(), canStreamResponseBody: false);
testRequestHeaders(OkHttpClient());
testRequestMethods(OkHttpClient(), preservesMethodCase: true);
testResponseHeaders(OkHttpClient(), supportsFoldedHeaders: false);
testResponseStatusLine(OkHttpClient());
testCompressedResponseBody(OkHttpClient());
testServerErrors(OkHttpClient());
testIsolate(OkHttpClient.new);
testRequestCookies(OkHttpClient(), canSendCookieHeaders: true);
testResponseCookies(OkHttpClient(), canReceiveSetCookieHeaders: true);
});
}
1 change: 1 addition & 0 deletions pkgs/ok_http/jnigen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ classes:
- "okhttp3.Call"
- "okhttp3.Headers"
- "okhttp3.Callback"
- "okhttp3.ConnectionPool"

# Exclude the deprecated methods listed below
# They cause syntax errors during the `dart format` step of JNIGen.
Expand Down
196 changes: 193 additions & 3 deletions pkgs/ok_http/lib/src/jni/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4360,7 +4360,7 @@ class OkHttpClient_Builder extends jni.JObject {
/// from: public final okhttp3.OkHttpClient$Builder connectionPool(okhttp3.ConnectionPool connectionPool)
/// The returned object must be released after use, by calling the [release] method.
OkHttpClient_Builder connectionPool(
jni.JObject connectionPool,
ConnectionPool connectionPool,
) {
return _connectionPool(
reference.pointer,
Expand Down Expand Up @@ -5487,10 +5487,10 @@ class OkHttpClient extends jni.JObject {

/// from: public final okhttp3.ConnectionPool connectionPool()
/// The returned object must be released after use, by calling the [release] method.
jni.JObject connectionPool() {
ConnectionPool connectionPool() {
return _connectionPool(
reference.pointer, _id_connectionPool as jni.JMethodIDPtr)
.object(const jni.JObjectType());
.object(const $ConnectionPoolType());
}

static final _id_interceptors = _class.instanceMethodId(
Expand Down Expand Up @@ -8138,3 +8138,193 @@ final class $CallbackType extends jni.JObjType<Callback> {
return other.runtimeType == ($CallbackType) && other is $CallbackType;
}
}

/// from: okhttp3.ConnectionPool
class ConnectionPool extends jni.JObject {
@override
late final jni.JObjType<ConnectionPool> $type = type;

ConnectionPool.fromReference(
jni.JReference reference,
) : super.fromReference(reference);

static final _class = jni.JClass.forName(r"okhttp3/ConnectionPool");

/// The type which includes information such as the signature of this class.
static const type = $ConnectionPoolType();
static final _id_new0 = _class.constructorId(
r"(Lokhttp3/internal/connection/RealConnectionPool;)V",
);

static final _new0 = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
ffi.VarArgs<(ffi.Pointer<ffi.Void>,)>)>>(
"globalEnv_NewObject")
.asFunction<
jni.JniResult Function(ffi.Pointer<ffi.Void>, jni.JMethodIDPtr,
ffi.Pointer<ffi.Void>)>();

/// from: public void <init>(okhttp3.internal.connection.RealConnectionPool realConnectionPool)
/// The returned object must be released after use, by calling the [release] method.
factory ConnectionPool(
jni.JObject realConnectionPool,
) {
return ConnectionPool.fromReference(_new0(_class.reference.pointer,
_id_new0 as jni.JMethodIDPtr, realConnectionPool.reference.pointer)
.reference);
}

static final _id_new1 = _class.constructorId(
r"(IJLjava/util/concurrent/TimeUnit;)V",
);

static final _new1 = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
ffi.VarArgs<
(
ffi.Int64,
ffi.Int64,
ffi.Pointer<ffi.Void>
)>)>>("globalEnv_NewObject")
.asFunction<
jni.JniResult Function(ffi.Pointer<ffi.Void>, jni.JMethodIDPtr, int,
int, ffi.Pointer<ffi.Void>)>();

/// from: public void <init>(int i, long j, java.util.concurrent.TimeUnit timeUnit)
/// The returned object must be released after use, by calling the [release] method.
factory ConnectionPool.new1(
int i,
int j,
jni.JObject timeUnit,
) {
return ConnectionPool.fromReference(_new1(_class.reference.pointer,
_id_new1 as jni.JMethodIDPtr, i, j, timeUnit.reference.pointer)
.reference);
}

static final _id_new2 = _class.constructorId(
r"()V",
);

static final _new2 = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>>("globalEnv_NewObject")
.asFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>();

/// from: public void <init>()
/// The returned object must be released after use, by calling the [release] method.
factory ConnectionPool.new2() {
return ConnectionPool.fromReference(
_new2(_class.reference.pointer, _id_new2 as jni.JMethodIDPtr)
.reference);
}

static final _id_idleConnectionCount = _class.instanceMethodId(
r"idleConnectionCount",
r"()I",
);

static final _idleConnectionCount = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>>("globalEnv_CallIntMethod")
.asFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>();

/// from: public final int idleConnectionCount()
int idleConnectionCount() {
return _idleConnectionCount(
reference.pointer, _id_idleConnectionCount as jni.JMethodIDPtr)
.integer;
}

static final _id_connectionCount = _class.instanceMethodId(
r"connectionCount",
r"()I",
);

static final _connectionCount = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>>("globalEnv_CallIntMethod")
.asFunction<
jni.JniResult Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>();

/// from: public final int connectionCount()
int connectionCount() {
return _connectionCount(
reference.pointer, _id_connectionCount as jni.JMethodIDPtr)
.integer;
}

static final _id_evictAll = _class.instanceMethodId(
r"evictAll",
r"()V",
);

static final _evictAll = ProtectedJniExtensions.lookup<
ffi.NativeFunction<
jni.JThrowablePtr Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>>("globalEnv_CallVoidMethod")
.asFunction<
jni.JThrowablePtr Function(
ffi.Pointer<ffi.Void>,
jni.JMethodIDPtr,
)>();

/// from: public final void evictAll()
void evictAll() {
_evictAll(reference.pointer, _id_evictAll as jni.JMethodIDPtr).check();
}
}

final class $ConnectionPoolType extends jni.JObjType<ConnectionPool> {
const $ConnectionPoolType();

@override
String get signature => r"Lokhttp3/ConnectionPool;";

@override
ConnectionPool fromReference(jni.JReference reference) =>
ConnectionPool.fromReference(reference);

@override
jni.JObjType get superType => const jni.JObjectType();

@override
final superCount = 1;

@override
int get hashCode => ($ConnectionPoolType).hashCode;

@override
bool operator ==(Object other) {
return other.runtimeType == ($ConnectionPoolType) &&
other is $ConnectionPoolType;
}
}
3 changes: 3 additions & 0 deletions pkgs/ok_http/lib/src/ok_http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class OkHttpClient extends BaseClient {
request: request,
contentLength: contentLength,
));

// Close and remove all idle connections from the resource pool.
_client.connectionPool().evictAll();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read your explanation but I still don't understand why this is necessary. Since the socket is closed by the server after receiving the headers, how can it be reused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that even if the socket is closed, the server keeps listening to the request

request.listen((line)) {

and even when you encounter an empty line, it only adds the cookies list to the channel sink.

This seemed very peculiar to me, but I confirmed that it was indeed the problem by doing:
If you create a StreamSubscription and cancel it when an empty line is encountered.

late StreamSubscription reqSubscription;
reqSubscription = request.listen((line) {
  if (line.toLowerCase().startsWith('cookie:')) {
    cookies.add(line);
  }

  if (line.isEmpty) {
    // A blank line indicates the end of the headers.
    channel.sink.add(cookies);
    reqSubscription.cancel(); // <- here
  }
});

then all works well, without the need of explicitly removing idle connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... the test seems racy. Does this formulation work:

await for (final line in request) {
        if (line.toLowerCase().startsWith('cookie:')) {
          cookies.add(line);
        }

        if (line.isEmpty) {
          // A blank line indicates the end of the headers.
          channel.sink.add(cookies);
          break;
        }
}

socket.writeAll(...)
await socket.close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest changing the test case instead? I didn't go that route because CronetClient seems to pass both cases without a problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest changing the test case instead? I didn't go that route because CronetClient seems to pass both cases without a problem.

Yeah, I think that the test is incorrect and it is a coincidence that it works ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, then it's good to close this PR. We would still need okhttp3.ConnectionPool and evictAll() for properly closing the client if I'm not wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, then it's good to close this PR. We would still need okhttp3.ConnectionPool and evictAll() for properly closing the client if I'm not wrong.

You probably want to empty the connection pool when in response to Client.close(). From the docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's exactly what I was going to follow.

  1. call shutdown on ExecutorService
  2. empty the connection pool
  3. release client object (JNI)

},
onFailure: (bindings.Call call, JObject ioException) {
responseCompleter.completeError(
Expand Down