Skip to content

Commit

Permalink
fix: minor connect cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 28, 2024
1 parent a55d61f commit 8e47f06
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 44 deletions.
56 changes: 32 additions & 24 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const {
kMaxResponseSize,
kListeners,
kOnError,
kResume
kResume,
kHTTPContext
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -403,6 +404,7 @@ class Parser {
removeAllListeners(socket)

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))

Expand Down Expand Up @@ -706,6 +708,7 @@ async function connectH1 (client, socket) {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)
Expand Down Expand Up @@ -744,36 +747,41 @@ async function connectH1 (client, socket) {
},
destroy () {
},
get destroyed () {
return socket.destroyed
},
busy (request) {
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
return true
}

if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}
if (request) {
if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}
}

return false
Expand Down
3 changes: 3 additions & 0 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ async function connectH2 (client, socket) {
destroy (err) {
session.destroy(err)
},
get destroyed () {
return socket.destroyed
},
busy () {
return false
}
Expand Down
36 changes: 16 additions & 20 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ const {
const buildConnector = require('../core/connect.js')
const {
kUrl,
kReset,
kServerName,
kClient,
kBusy,
kConnect,
kBlocking,
kResuming,
kRunning,
kPending,
kSize,
kWriting,
kQueue,
kConnected,
kConnecting,
Expand Down Expand Up @@ -277,13 +274,12 @@ class Client extends DispatcherBase {
}

get [kConnected] () {
return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
}

get [kBusy] () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
return Boolean(
this[kHTTPContext]?.busy(null) ||
(this[kSize] >= (getPipelining(this) || 1)) ||
this[kPending] > 0
)
Expand Down Expand Up @@ -441,13 +437,18 @@ async function connect (client) {
return
}

client[kConnecting] = false

assert(socket)

client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
try {
client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
} catch (err) {
socket.destroy().on('error', () => {})
throw err
}

client[kConnecting] = false

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
Expand Down Expand Up @@ -546,8 +547,6 @@ function _resume (client, sync) {
return
}

const socket = client[kSocket]

if (client[kHTTPContext]) {
client[kHTTPContext].resume()
}
Expand Down Expand Up @@ -581,6 +580,7 @@ function _resume (client, sync) {

client[kServerName] = request.servername

const socket = client[kSocket]
if (socket && socket.servername !== request.servername) {
util.destroy(socket, new InformationalError('servername changed'))
return
Expand All @@ -591,16 +591,12 @@ function _resume (client, sync) {
return
}

if (!socket) {
if (!client[kHTTPContext]) {
connect(client)
return
}

if (socket.destroyed) {
return
}

if (!client[kHTTPContext]) {
if (client[kHTTPContext].destroyed) {
return
}

Expand Down

0 comments on commit 8e47f06

Please sign in to comment.