Skip to content

Commit

Permalink
feat(node-http-handler): implement connections pool and manager inter…
Browse files Browse the repository at this point in the history
…faces (#4508)
  • Loading branch information
AndrewFossAWS authored Mar 21, 2023
1 parent 25aec20 commit 86a6046
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 119 deletions.
33 changes: 27 additions & 6 deletions packages/node-http-handler/src/node-http-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,35 @@ import { Agent as hsAgent, request as hsRequest, RequestOptions } from "https";

import { NODEJS_TIMEOUT_ERROR_CODES } from "./constants";
import { getTransformedHeaders } from "./get-transformed-headers";
import { setConnectionTimeout } from "./set-connection-timeout";
import { setSocketTimeout } from "./set-socket-timeout";
import { writeRequestBody } from "./write-request-body";

/**
* Represents the http options that can be passed to a node http client.
*/
export interface NodeHttpHandlerOptions {
/**
* @deprecated Use {@link requestTimeout}
*
* Note:{@link NodeHttpHandler} will resolve request timeout via nullish coalescing the following fields:
* {@link requestTimeout} ?? {@link connectionTimeout} ?? {@link socketTimeout} ?? {@link DEFAULT_REQUEST_TIMEOUT}
*
* The maximum time in milliseconds that the connection phase of a request
* may take before the connection attempt is abandoned.
*/
connectionTimeout?: number;

/**
* The maximum time in milliseconds that the connection phase of a request
* may take before the connection attempt is abandoned.
*/
requestTimeout?: number;

/**
* @deprecated Use {@link requestTimeout}
*
* Note:{@link NodeHttpHandler} will resolve request timeout via nullish coalescing the following fields:
* {@link requestTimeout} ?? {@link connectionTimeout} ?? {@link socketTimeout} ?? {@link DEFAULT_REQUEST_TIMEOUT}
*
* The maximum time in milliseconds that a socket may remain idle before it
* is closed.
*/
Expand All @@ -31,12 +45,15 @@ export interface NodeHttpHandlerOptions {
}

interface ResolvedNodeHttpHandlerConfig {
requestTimeout: number;
connectionTimeout?: number;
socketTimeout?: number;
httpAgent: hAgent;
httpsAgent: hsAgent;
}

export const DEFAULT_REQUEST_TIMEOUT = 0;

export class NodeHttpHandler implements HttpHandler {
private config?: ResolvedNodeHttpHandlerConfig;
private readonly configProvider: Promise<ResolvedNodeHttpHandlerConfig>;
Expand All @@ -59,12 +76,14 @@ export class NodeHttpHandler implements HttpHandler {
}

private resolveDefaultConfig(options?: NodeHttpHandlerOptions | void): ResolvedNodeHttpHandlerConfig {
const { connectionTimeout, socketTimeout, httpAgent, httpsAgent } = options || {};
const { requestTimeout, connectionTimeout, socketTimeout, httpAgent, httpsAgent } = options || {};
const keepAlive = true;
const maxSockets = 50;

return {
connectionTimeout,
socketTimeout,
requestTimeout: requestTimeout ?? connectionTimeout ?? socketTimeout ?? DEFAULT_REQUEST_TIMEOUT,
httpAgent: httpAgent || new hAgent({ keepAlive, maxSockets }),
httpsAgent: httpsAgent || new hsAgent({ keepAlive, maxSockets }),
};
Expand Down Expand Up @@ -123,9 +142,11 @@ export class NodeHttpHandler implements HttpHandler {
}
});

// wire-up any timeout logic
setConnectionTimeout(req, reject, this.config.connectionTimeout);
setSocketTimeout(req, reject, this.config.socketTimeout);
const timeout: number = this.config?.requestTimeout ?? DEFAULT_REQUEST_TIMEOUT;
req.setTimeout(timeout, () => {
req.destroy();
reject(Object.assign(new Error(`Connection timed out after ${timeout} ms`), { name: "TimeoutError" }));
});

// wire-up abort logic
if (abortSignal) {
Expand Down
125 changes: 125 additions & 0 deletions packages/node-http-handler/src/node-http2-connection-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { RequestContext } from "@aws-sdk/types";
import { ConnectConfiguration } from "@aws-sdk/types/src/connection/config";
import { ConnectionManager, ConnectionManagerConfiguration } from "@aws-sdk/types/src/connection/manager";
import http2, { ClientHttp2Session } from "http2";

import { NodeHttp2ConnectionPool } from "./node-http2-connection-pool";

export class NodeHttp2ConnectionManager implements ConnectionManager<ClientHttp2Session> {
constructor(config: ConnectionManagerConfiguration) {
this.config = config;

if (this.config.maxConcurrency && this.config.maxConcurrency <= 0) {
throw new RangeError("maxConcurrency must be greater than zero.");
}
}

private config: ConnectionManagerConfiguration;

private readonly sessionCache: Map<string, NodeHttp2ConnectionPool> = new Map<string, NodeHttp2ConnectionPool>();

public lease(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): ClientHttp2Session {
const url = this.getUrlString(requestContext);

const existingPool = this.sessionCache.get(url);

if (existingPool) {
const existingSession = existingPool.poll();
if (existingSession && !this.config.disableConcurrency) {
return existingSession;
}
}

const session = http2.connect(url);

if (this.config.maxConcurrency) {
session.settings({ maxConcurrentStreams: this.config.maxConcurrency }, (err) => {
if (err) {
throw new Error(
"Fail to set maxConcurrentStreams to " +
this.config.maxConcurrency +
"when creating new session for " +
requestContext.destination.toString()
);
}
});
}

// AWS SDK does not expect server push streams, don't keep node alive without a request.
session.unref();

const destroySessionCb = () => {
session.destroy();
this.deleteSession(url, session);
};
session.on("goaway", destroySessionCb);
session.on("error", destroySessionCb);
session.on("frameError", destroySessionCb);
session.on("close", () => this.deleteSession(url, session));

if (connectionConfiguration.requestTimeout) {
session.setTimeout(connectionConfiguration.requestTimeout, destroySessionCb);
}

const connectionPool = this.sessionCache.get(url) || new NodeHttp2ConnectionPool();

connectionPool.offerLast(session);

this.sessionCache.set(url, connectionPool);

return session;
}

/**
* Delete a session from the connection pool.
* @param authority The authority of the session to delete.
* @param session The session to delete.
*/
public deleteSession(authority: string, session: ClientHttp2Session): void {
const existingConnectionPool = this.sessionCache.get(authority);

if (!existingConnectionPool) {
return;
}

if (!existingConnectionPool.contains(session)) {
return;
}

existingConnectionPool.remove(session);

this.sessionCache.set(authority, existingConnectionPool);
}

public release(requestContext: RequestContext, session: ClientHttp2Session): void {
const cacheKey = this.getUrlString(requestContext);
this.sessionCache.get(cacheKey)?.offerLast(session);
}

public destroy(): void {
for (const [key, connectionPool] of this.sessionCache) {
for (const session of connectionPool) {
if (!session.destroyed) {
session.destroy();
}
connectionPool.remove(session);
}
this.sessionCache.delete(key);
}
}

public setMaxConcurrentStreams(maxConcurrentStreams: number) {
if (this.config.maxConcurrency && this.config.maxConcurrency <= 0) {
throw new RangeError("maxConcurrentStreams must be greater than zero.");
}
this.config.maxConcurrency = maxConcurrentStreams;
}

public setDisableConcurrentStreams(disableConcurrentStreams: boolean) {
this.config.disableConcurrency = disableConcurrentStreams;
}

private getUrlString(request: RequestContext): string {
return request.destination.toString();
}
}
42 changes: 42 additions & 0 deletions packages/node-http-handler/src/node-http2-connection-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { ConnectionPool } from "@aws-sdk/types/src/connection/pool";
import { ClientHttp2Session } from "http2";

export class NodeHttp2ConnectionPool implements ConnectionPool<ClientHttp2Session> {
private sessions: ClientHttp2Session[] = [];

constructor(sessions?: ClientHttp2Session[]) {
this.sessions = sessions ?? [];
}

public poll(): ClientHttp2Session | void {
if (this.sessions.length > 0) {
return this.sessions.shift();
}
}

public offerLast(session: ClientHttp2Session): void {
this.sessions.push(session);
}

public contains(session: ClientHttp2Session): boolean {
return this.sessions.includes(session);
}

public remove(session: ClientHttp2Session): void {
this.sessions = this.sessions.filter((s) => s !== session);
}

public [Symbol.iterator]() {
return this.sessions[Symbol.iterator]();
}

public destroy(connection: ClientHttp2Session): void {
for (const session of this.sessions) {
if (session === connection) {
if (!session.destroyed) {
session.destroy();
}
}
}
}
}
Loading

0 comments on commit 86a6046

Please sign in to comment.