Skip to content

Commit

Permalink
refactor: avoid http2 dynamic dispatch in socket handlers (#2839)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 25, 2024
1 parent 2a368b2 commit 95bd929
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 104 deletions.
3 changes: 2 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ module.exports = {
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable')
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
}
238 changes: 135 additions & 103 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const {
kLocalAddress,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
// HTTP2
kHost,
kHTTP2Session,
Expand Down Expand Up @@ -111,6 +112,20 @@ const FastBuffer = Buffer[Symbol.species]

const kClosedResolve = Symbol('kClosedResolve')

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

/**
* @type {import('../../types/client.js').default}
*/
Expand Down Expand Up @@ -276,7 +291,7 @@ class Client extends DispatcherBase {
this[kMaxRequests] = maxRequestsPerClient
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
this[kHTTPConnVersion] = 'h1'
this[kHTTPConnVersion] = null

// HTTP/2
this[kHTTP2Session] = null
Expand Down Expand Up @@ -803,11 +818,8 @@ class Parser {

socket[kClient] = null
socket[kError] = null
socket
.removeListener('error', onSocketError)
.removeListener('readable', onSocketReadable)
.removeListener('end', onSocketEnd)
.removeListener('close', onSocketClose)

removeAllListeners(socket)

client[kSocket] = null
client[kHTTP2Session] = null
Expand Down Expand Up @@ -1050,33 +1062,6 @@ function onParserTimeout (parser) {
}
}

function onSocketReadable () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
}

function onSocketError (err) {
const { [kClient]: client, [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

if (client[kHTTPConnVersion] !== 'h2') {
// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}
}

this[kError] = err

onError(this[kClient], err)
}

function onError (client, err) {
if (
client[kRunning] === 0 &&
Expand All @@ -1097,32 +1082,8 @@ function onError (client, err) {
}
}

function onSocketEnd () {
const { [kParser]: parser, [kClient]: client } = this

if (client[kHTTPConnVersion] !== 'h2') {
if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onSocketClose () {
const { [kClient]: client, [kParser]: parser } = this

if (client[kHTTPConnVersion] === 'h1' && parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

Expand Down Expand Up @@ -1215,56 +1176,19 @@ async function connect (client) {

assert(socket)

const isH2 = socket.alpnProtocol === 'h2'
if (isH2) {
if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
code: 'UNDICI-H2'
})
}

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
})

client[kHTTPConnVersion] = 'h2'
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', onSocketClose)
session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session
if (socket.alpnProtocol === 'h2') {
await connectH2(client, socket)
} else {
if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)
await connectH1(client, socket)
}

addListener(socket, 'close', onSocketClose)

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket[kClient] = client
socket[kError] = null

socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)

client[kSocket] = socket

if (channels.connected.hasSubscribers) {
Expand Down Expand Up @@ -1475,10 +1399,15 @@ function shouldSendContentLength (method) {

function write (client, request) {
if (client[kHTTPConnVersion] === 'h2') {
writeH2(client, client[kHTTP2Session], request)
return
// TODO (fix): Why does this not return the value
// from writeH2.
writeH2(client, request)
} else {
return writeH1(client, request)
}
}

function writeH1 (client, request) {
const { method, path, host, upgrade, blocking, reset } = request

let { body, headers, contentLength } = request
Expand Down Expand Up @@ -1656,7 +1585,8 @@ function write (client, request) {
return true
}

function writeH2 (client, session, request) {
function writeH2 (client, request) {
const session = client[kHTTP2Session]
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request

let headers
Expand Down Expand Up @@ -2341,4 +2271,106 @@ function errorRequest (client, request, err) {
}
}

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
const { [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'readable', function () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
})
addListener(socket, 'end', function () {
const { [kParser]: parser } = this

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', function () {
const { [kParser]: parser } = this

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}
})
}

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
code: 'UNDICI-H2'
})
}

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
})

session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', onSocketClose)
session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
}

module.exports = Client

0 comments on commit 95bd929

Please sign in to comment.