From 19b4d3928c393652d0475fb07735e026009c0698 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 28 Feb 2024 11:17:48 +0100 Subject: [PATCH] refactor: move out more h2 from core client (#2860) * refactor: move out more h2 from core client * WIP * WIP * WIP --- .gitignore | 2 + lib/core/symbols.js | 5 ++- lib/dispatcher/client-h1.js | 21 +++++++---- lib/dispatcher/client-h2.js | 61 +++++++++++++++++------------- lib/dispatcher/client.js | 75 +++++++++++++------------------------ 5 files changed, 80 insertions(+), 84 deletions(-) diff --git a/.gitignore b/.gitignore index 0d8d333f83a..60aa663c838 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,5 @@ undici-fetch.js # .npmrc has platform specific value for windows .npmrc + +.tap diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 1eb4858c378..f3c6e1d5339 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -59,8 +59,9 @@ module.exports = { kHTTP2BuildRequest: Symbol('http2 build request'), kHTTP1BuildRequest: Symbol('http1 build request'), kHTTP2CopyHeaders: Symbol('http2 copy headers'), - kHTTPConnVersion: Symbol('http connection version'), kRetryHandlerDefaultRetry: Symbol('retry agent default retry'), kConstruct: Symbol('constructable'), - kListeners: Symbol('listeners') + kListeners: Symbol('listeners'), + kHTTPContext: Symbol('http context'), + kMaxConcurrentStreams: Symbol('max concurrent streams') } diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 9ee1686efd9..3cc9b3123b8 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -47,7 +47,6 @@ const { kMaxRequests, kCounter, kMaxResponseSize, - kHTTPConnVersion, kListeners, kOnError, kResume @@ -644,8 +643,6 @@ function onParserTimeout (parser) { } async function connectH1 (client, socket) { - client[kHTTPConnVersion] = 'h1' - if (!llhttpInstance) { llhttpInstance = await llhttpPromise llhttpPromise = null @@ -735,6 +732,18 @@ async function connectH1 (client, socket) { client[kResume]() }) + + return { + version: 'h1', + write (...args) { + return writeH1(client, ...args) + }, + resume () { + resumeH1(client) + }, + destroy () { + } + } } function resumeH1 (client) { @@ -1274,8 +1283,4 @@ class AsyncWriter { } } -module.exports = { - connectH1, - writeH1, - resumeH1 -} +module.exports = connectH1 diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 89a42ed273f..04d24b741b3 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -22,16 +22,16 @@ const { kError, kSocket, kStrictContentLength, - kHTTPConnVersion, kOnError, // HTTP2 - kHost, + kMaxConcurrentStreams, kHTTP2Session, - kHTTP2SessionState, kHTTP2CopyHeaders, kResume } = require('../core/symbols.js') +const kOpenStreams = Symbol('open streams') + // Experimental let h2ExperimentalWarned = false @@ -57,8 +57,6 @@ const { } = http2 async function connectH2 (client, socket) { - client[kHTTPConnVersion] = 'h2' - if (!h2ExperimentalWarned) { h2ExperimentalWarned = true process.emitWarning('H2 support is experimental, expect them to change at any time.', { @@ -68,9 +66,10 @@ async function connectH2 (client, socket) { const session = http2.connect(client[kUrl], { createConnection: () => socket, - peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams + peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) + session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket session.on('error', onHttp2SessionError) @@ -124,6 +123,20 @@ async function connectH2 (client, socket) { socket.on('end', function () { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) + + return { + version: 'h2', + write (...args) { + // TODO (fix): return + writeH2(client, ...args) + }, + resume () { + + }, + destroy (err) { + session.destroy(err) + } + } } function onHttp2SessionError (err) { @@ -217,9 +230,10 @@ function writeH2 (client, request) { /** @type {import('node:http2').ClientHttp2Stream} */ let stream - const h2State = client[kHTTP2SessionState] - headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] + const { hostname, port } = client[kUrl] + + headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}` headers[HTTP2_HEADER_METHOD] = method try { @@ -235,8 +249,8 @@ function writeH2 (client, request) { if (stream != null) { util.destroy(stream, err) - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } } @@ -257,18 +271,18 @@ function writeH2 (client, request) { if (stream.id && !stream.pending) { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] } else { stream.once('ready', () => { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] }) } stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) session.unref() + if (session[kOpenStreams] === 0) session.unref() }) return true @@ -348,7 +362,7 @@ function writeH2 (client, request) { } // Increment counter as we have new several streams open - ++h2State.openStreams + ++session[kOpenStreams] stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers @@ -371,8 +385,8 @@ function writeH2 (client, request) { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } @@ -388,16 +402,16 @@ function writeH2 (client, request) { }) stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) { + if (session[kOpenStreams] === 0) { session.unref() } }) stream.once('error', function (err) { if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) @@ -407,7 +421,7 @@ function writeH2 (client, request) { errorRequest(client, request, err) if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) @@ -599,7 +613,4 @@ async function writeIterable ({ h2stream, body, client, request, socket, content } } -module.exports = { - connectH2, - writeH2 -} +module.exports = connectH2 diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 577dbb7ab56..2081a4bcd79 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -56,18 +56,16 @@ const { kInterceptors, kLocalAddress, kMaxResponseSize, - kHTTPConnVersion, kOnError, + kHTTPContext, // HTTP2 - kHost, - kHTTP2Session, - kHTTP2SessionState, + kMaxConcurrentStreams, kHTTP2BuildRequest, kHTTP1BuildRequest, kResume } = require('../core/symbols.js') -const { connectH1, writeH1, resumeH1 } = require('./client-h1.js') -const { connectH2, writeH2 } = require('./client-h2.js') +const connectH1 = require('./client-h1.js') +const connectH2 = require('./client-h2.js') const kClosedResolve = Symbol('kClosedResolve') @@ -107,8 +105,8 @@ class Client extends DispatcherBase { autoSelectFamily, autoSelectFamilyAttemptTimeout, // h2 - allowH2, - maxConcurrentStreams + maxConcurrentStreams, + allowH2 } = {}) { super() @@ -236,18 +234,8 @@ class Client extends DispatcherBase { this[kMaxRequests] = maxRequestsPerClient this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 - this[kHTTPConnVersion] = null - - // HTTP/2 - this[kHTTP2Session] = null - this[kHTTP2SessionState] = !allowH2 - ? null - : { - // streams: null, // Fixed queue of streams - For future support of `push` - openStreams: 0, // Keep track of them to decide whether or not unref the session - maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server - } - this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` + this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server + this[kHTTPContext] = null // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -309,7 +297,9 @@ class Client extends DispatcherBase { [kDispatch] (opts, handler) { const origin = opts.origin || this[kUrl].origin - const request = this[kHTTPConnVersion] === 'h2' + // TODO (fix): Why do these need to be + // TODO (fix): This can happen before connect... + const request = this[kHTTPContext]?.version === 'h2' ? Request[kHTTP2BuildRequest](origin, opts, handler) : Request[kHTTP1BuildRequest](origin, opts, handler) @@ -360,14 +350,13 @@ class Client extends DispatcherBase { resolve(null) } - if (this[kHTTP2Session] != null) { - util.destroy(this[kHTTP2Session], err) - this[kHTTP2Session] = null - this[kHTTP2SessionState] = null + if (this[kHTTPContext] != null) { + this[kHTTPContext].destroy(err) + this[kHTTPContext] = null } if (this[kSocket]) { - util.destroy(this[kSocket].on('close', callback), err) + this[kSocket].destroy(err).on('close', callback) } else { queueMicrotask(callback) } @@ -425,7 +414,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -460,11 +449,9 @@ async function connect (client) { assert(socket) - if (socket.alpnProtocol === 'h2') { - await connectH2(client, socket) - } else { - await connectH1(client, socket) - } + client[kHTTPContext] = socket.alpnProtocol === 'h2' + ? await connectH2(client, socket) + : await connectH1(client, socket) socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] @@ -480,7 +467,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -503,7 +490,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -565,8 +552,8 @@ function _resume (client, sync) { const socket = client[kSocket] - if (socket && socket.alpnProtocol !== 'h2') { - resumeH1(client) + if (client[kHTTPContext]) { + client[kHTTPContext].resume() } if (client[kBusy]) { @@ -585,7 +572,7 @@ function _resume (client, sync) { return } - if (client[kHTTPConnVersion] === 'h1') { + if (client[kHTTPContext]?.version === 'h1') { if (client[kRunning] >= (client[kPipelining] || 1)) { return } @@ -619,7 +606,7 @@ function _resume (client, sync) { return } - if (client[kHTTPConnVersion] === 'h1') { + if (client[kHTTPContext].version === 'h1') { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return } @@ -652,7 +639,7 @@ function _resume (client, sync) { } } - if (!request.aborted && write(client, request)) { + if (!request.aborted && client[kHTTPContext].write(request)) { client[kPendingIdx]++ } else { client[kQueue].splice(client[kPendingIdx], 1) @@ -660,16 +647,6 @@ function _resume (client, sync) { } } -function write (client, request) { - if (client[kHTTPConnVersion] === 'h2') { - // TODO (fix): Why does this not return the value - // from writeH2. - writeH2(client, request) - } else { - return writeH1(client, request) - } -} - function errorRequest (client, request, err) { try { request.onError(err)