Skip to content

Commit

Permalink
fix: minor connect cleanup (#2877)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Feb 28, 2024
1 parent 46b1b0f commit 2a27dc1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 62 deletions.
70 changes: 45 additions & 25 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const {
kMaxResponseSize,
kListeners,
kOnError,
kResume
kResume,
kHTTPContext
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -403,6 +404,7 @@ class Parser {
removeAllListeners(socket)

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))

Expand Down Expand Up @@ -643,6 +645,8 @@ function onParserTimeout (parser) {
}

async function connectH1 (client, socket) {
client[kSocket] = socket

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
Expand Down Expand Up @@ -706,6 +710,7 @@ async function connectH1 (client, socket) {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)
Expand Down Expand Up @@ -733,6 +738,11 @@ async function connectH1 (client, socket) {
client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
})

return {
version: 'h1',
defaultPipelining: 1,
Expand All @@ -742,38 +752,48 @@ async function connectH1 (client, socket) {
resume () {
resumeH1(client)
},
destroy () {
destroy (err, callback) {
if (closed) {
queueMicrotask(callback)
} else {
socket.destroy(err).on('close', callback)
}
},
get destroyed () {
return socket.destroyed
},
busy (request) {
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
return true
}

if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}
if (request) {
if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}
}

return false
Expand Down
17 changes: 16 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const {
} = http2

async function connectH2 (client, socket) {
client[kSocket] = socket

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
Expand Down Expand Up @@ -114,6 +116,11 @@ async function connectH2 (client, socket) {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

let closed = false
socket.on('close', () => {
closed = true
})

return {
version: 'h2',
defaultPipelining: Infinity,
Expand All @@ -124,8 +131,16 @@ async function connectH2 (client, socket) {
resume () {

},
destroy (err) {
destroy (err, callback) {
session.destroy(err)
if (closed) {
queueMicrotask(callback)
} else {
socket.destroy(err).on('close', callback)
}
},
get destroyed () {
return socket.destroyed
},
busy () {
return false
Expand Down
55 changes: 19 additions & 36 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ const {
const buildConnector = require('../core/connect.js')
const {
kUrl,
kReset,
kServerName,
kClient,
kBusy,
kConnect,
kBlocking,
kResuming,
kRunning,
kPending,
kSize,
kWriting,
kQueue,
kConnected,
kConnecting,
Expand All @@ -38,7 +35,6 @@ const {
kRunningIdx,
kError,
kPipelining,
kSocket,
kKeepAliveTimeoutValue,
kMaxHeadersSize,
kKeepAliveMaxTimeout,
Expand Down Expand Up @@ -216,7 +212,6 @@ class Client extends DispatcherBase {
: [createRedirectInterceptor({ maxRedirections })]
this[kUrl] = util.parseOrigin(url)
this[kConnector] = connect
this[kSocket] = null
this[kPipelining] = pipelining != null ? pipelining : 1
this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
Expand Down Expand Up @@ -277,13 +272,12 @@ class Client extends DispatcherBase {
}

get [kConnected] () {
return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
}

get [kBusy] () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
return Boolean(
this[kHTTPContext]?.busy(null) ||
(this[kSize] >= (getPipelining(this) || 1)) ||
this[kPending] > 0
)
Expand Down Expand Up @@ -346,13 +340,9 @@ class Client extends DispatcherBase {
resolve(null)
}

if (this[kHTTPContext] != null) {
this[kHTTPContext].destroy(err)
if (this[kHTTPContext]) {
this[kHTTPContext].destroy(err, callback)
this[kHTTPContext] = null
}

if (this[kSocket]) {
this[kSocket].destroy(err).on('close', callback)
} else {
queueMicrotask(callback)
}
Expand Down Expand Up @@ -386,7 +376,7 @@ function onError (client, err) {

async function connect (client) {
assert(!client[kConnecting])
assert(!client[kSocket])
assert(!client[kHTTPContext])

let { host, hostname, protocol, port } = client[kUrl]

Expand Down Expand Up @@ -441,21 +431,24 @@ async function connect (client) {
return
}

client[kConnecting] = false

assert(socket)

client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
try {
client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
} catch (err) {
socket.destroy().on('error', () => {})
throw err
}

client[kConnecting] = false

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket[kClient] = client
socket[kError] = null

client[kSocket] = socket

if (channels.connected.hasSubscribers) {
channels.connected.publish({
connectParams: {
Expand Down Expand Up @@ -546,8 +539,6 @@ function _resume (client, sync) {
return
}

const socket = client[kSocket]

if (client[kHTTPContext]) {
client[kHTTPContext].resume()
}
Expand Down Expand Up @@ -580,27 +571,19 @@ function _resume (client, sync) {
}

client[kServerName] = request.servername

if (socket && socket.servername !== request.servername) {
util.destroy(socket, new InformationalError('servername changed'))
return
}
client[kHTTPContext]?.destroy(new InformationalError('servername changed'))
}

if (client[kConnecting]) {
return
}

if (!socket) {
if (!client[kHTTPContext]) {
connect(client)
return
}

if (socket.destroyed) {
return
}

if (!client[kHTTPContext]) {
if (client[kHTTPContext].destroyed) {
return
}

Expand Down

0 comments on commit 2a27dc1

Please sign in to comment.