From 8e47f063fa1e683559bf509de1dbaa9929117a4b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 28 Feb 2024 13:21:21 +0100 Subject: [PATCH] fix: minor connect cleanup --- lib/dispatcher/client-h1.js | 56 +++++++++++++++++++++---------------- lib/dispatcher/client-h2.js | 3 ++ lib/dispatcher/client.js | 36 +++++++++++------------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 0cb6528ae06..29b4fe0fc77 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -49,7 +49,8 @@ const { kMaxResponseSize, kListeners, kOnError, - kResume + kResume, + kHTTPContext } = require('../core/symbols.js') const constants = require('../llhttp/constants.js') @@ -403,6 +404,7 @@ class Parser { removeAllListeners(socket) client[kSocket] = null + client[kHTTPContext] = null // TODO (fix): This is hacky... client[kQueue][client[kRunningIdx]++] = null client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) @@ -706,6 +708,7 @@ async function connectH1 (client, socket) { const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) client[kSocket] = null + client[kHTTPContext] = null // TODO (fix): This is hacky... if (client.destroyed) { assert(client[kPending] === 0) @@ -744,36 +747,41 @@ async function connectH1 (client, socket) { }, destroy () { }, + get destroyed () { + return socket.destroyed + }, busy (request) { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return true } - if (client[kRunning] > 0 && !request.idempotent) { - // Non-idempotent request cannot be retried. - // Ensure that no other requests are inflight and - // could cause failure. - return true - } - - if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { - // Don't dispatch an upgrade until all preceding requests have completed. - // A misbehaving server might upgrade the connection before all pipelined - // request has completed. - return true - } + if (request) { + if (client[kRunning] > 0 && !request.idempotent) { + // Non-idempotent request cannot be retried. + // Ensure that no other requests are inflight and + // could cause failure. + return true + } - if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && - (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) { - // Request with stream or iterator body can error while other requests - // are inflight and indirectly error those as well. - // Ensure this doesn't happen by waiting for inflight - // to complete before dispatching. + if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { + // Don't dispatch an upgrade until all preceding requests have completed. + // A misbehaving server might upgrade the connection before all pipelined + // request has completed. + return true + } - // Request with stream or iterator body cannot be retried. - // Ensure that no other requests are inflight and - // could cause failure. - return true + if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && + (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) { + // Request with stream or iterator body can error while other requests + // are inflight and indirectly error those as well. + // Ensure this doesn't happen by waiting for inflight + // to complete before dispatching. + + // Request with stream or iterator body cannot be retried. + // Ensure that no other requests are inflight and + // could cause failure. + return true + } } return false diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 7202c654397..8a5f2c2ffd0 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -135,6 +135,9 @@ async function connectH2 (client, socket) { destroy (err) { session.destroy(err) }, + get destroyed () { + return socket.destroyed + }, busy () { return false } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 7c75c2ff239..9cc523c2d1d 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -17,17 +17,14 @@ const { const buildConnector = require('../core/connect.js') const { kUrl, - kReset, kServerName, kClient, kBusy, kConnect, - kBlocking, kResuming, kRunning, kPending, kSize, - kWriting, kQueue, kConnected, kConnecting, @@ -277,13 +274,12 @@ class Client extends DispatcherBase { } get [kConnected] () { - return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed + return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed } get [kBusy] () { - const socket = this[kSocket] - return ( - (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) || + return Boolean( + this[kHTTPContext]?.busy(null) || (this[kSize] >= (getPipelining(this) || 1)) || this[kPending] > 0 ) @@ -441,13 +437,18 @@ async function connect (client) { return } - client[kConnecting] = false - assert(socket) - client[kHTTPContext] = socket.alpnProtocol === 'h2' - ? await connectH2(client, socket) - : await connectH1(client, socket) + try { + client[kHTTPContext] = socket.alpnProtocol === 'h2' + ? await connectH2(client, socket) + : await connectH1(client, socket) + } catch (err) { + socket.destroy().on('error', () => {}) + throw err + } + + client[kConnecting] = false socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] @@ -546,8 +547,6 @@ function _resume (client, sync) { return } - const socket = client[kSocket] - if (client[kHTTPContext]) { client[kHTTPContext].resume() } @@ -581,6 +580,7 @@ function _resume (client, sync) { client[kServerName] = request.servername + const socket = client[kSocket] if (socket && socket.servername !== request.servername) { util.destroy(socket, new InformationalError('servername changed')) return @@ -591,16 +591,12 @@ function _resume (client, sync) { return } - if (!socket) { + if (!client[kHTTPContext]) { connect(client) return } - if (socket.destroyed) { - return - } - - if (!client[kHTTPContext]) { + if (client[kHTTPContext].destroyed) { return }