From 6f4d97beea2f099178ea049e10b592d25ea94f8b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Dec 2023 09:18:21 +0100 Subject: [PATCH 1/2] fix: stream error timings (#2497) --- lib/api/readable.js | 69 ++++++++++++++++++++++++++++--------------- lib/client.js | 22 +++++++++----- test/readable.test.js | 37 +++++++++++++++++++++++ 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 5269dfae50c..356a03aedd9 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -62,6 +62,18 @@ module.exports = class BodyReadable extends Readable { return super.destroy(err) } + _destroy (err, callback) { + // Workaround for Node "bug". If the stream is destroyed in same + // tick as it is created, then a user who is waiting for a + // promise (i.e micro tick) for installing a 'error' listener will + // never get a chance and will always encounter an unhandled exception. + // - tick => process.nextTick(fn) + // - micro tick => queueMicrotask(fn) + queueMicrotask(() => { + callback(err) + }) + } + emit (ev, ...args) { if (ev === 'data') { // Node < 16.7 @@ -166,7 +178,7 @@ module.exports = class BodyReadable extends Readable { } } - if (this.closed) { + if (this._readableState.closeEmitted) { return Promise.resolve(null) } @@ -210,33 +222,44 @@ function isUnusable (self) { } async function consume (stream, type) { - if (isUnusable(stream)) { - throw new TypeError('unusable') - } - assert(!stream[kConsume]) return new Promise((resolve, reject) => { - stream[kConsume] = { - type, - stream, - resolve, - reject, - length: 0, - body: [] - } + if (isUnusable(stream)) { + const rState = stream._readableState + if (rState.destroyed && rState.closeEmitted === false) { + stream + .on('error', err => { + reject(err) + }) + .on('close', () => { + reject(new TypeError('unusable')) + }) + } else { + reject(rState.errored ?? new TypeError('unusable')) + } + } else { + stream[kConsume] = { + type, + stream, + resolve, + reject, + length: 0, + body: [] + } - stream - .on('error', function (err) { - consumeFinish(this[kConsume], err) - }) - .on('close', function () { - if (this[kConsume].body !== null) { - consumeFinish(this[kConsume], new RequestAbortedError()) - } - }) + stream + .on('error', function (err) { + consumeFinish(this[kConsume], err) + }) + .on('close', function () { + if (this[kConsume].body !== null) { + consumeFinish(this[kConsume], new RequestAbortedError()) + } + }) - process.nextTick(consumeStart, stream[kConsume]) + queueMicrotask(() => consumeStart(stream[kConsume])) + } }) } diff --git a/lib/client.js b/lib/client.js index af16fd16d3e..2fec533faf1 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1963,12 +1963,19 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, body.resume() } } - const onAbort = function () { - if (finished) { - return + const onClose = function () { + // 'close' might be emitted *before* 'error' for + // broken streams. Wait a tick to avoid this case. + queueMicrotask(() => { + // It's only safe to remove 'error' listener after + // 'close'. + body.removeListener('error', onFinished) + }) + + if (!finished) { + const err = new RequestAbortedError() + queueMicrotask(() => onFinished(err)) } - const err = new RequestAbortedError() - queueMicrotask(() => onFinished(err)) } const onFinished = function (err) { if (finished) { @@ -1986,8 +1993,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, body .removeListener('data', onData) .removeListener('end', onFinished) - .removeListener('error', onFinished) - .removeListener('close', onAbort) + .removeListener('close', onClose) if (!err) { try { @@ -2010,7 +2016,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, .on('data', onData) .on('end', onFinished) .on('error', onFinished) - .on('close', onAbort) + .on('close', onClose) if (body.resume) { body.resume() diff --git a/test/readable.test.js b/test/readable.test.js index 3f4f7939f94..535ecb66baa 100644 --- a/test/readable.test.js +++ b/test/readable.test.js @@ -21,3 +21,40 @@ test('avoid body reordering', async function (t) { t.equal(text, 'helloworld') }) + +test('destroy timing text', async function (t) { + t.plan(1) + + function resume () { + } + function abort () { + } + const _err = new Error('kaboom') + const r = new Readable({ resume, abort }) + r.destroy(_err) + try { + await r.text() + } catch (err) { + t.same(err, _err) + } +}) + +test('destroy timing promise', async function (t) { + t.plan(1) + + function resume () { + } + function abort () { + } + const r = await new Promise(resolve => { + const r = new Readable({ resume, abort }) + r.destroy(new Error('kaboom')) + resolve(r) + }) + await new Promise(resolve => { + r.on('error', err => { + t.ok(err) + resolve(null) + }) + }) +}) From 0c3c6f8474857497ad1d8ca3d2687a66589079d3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Dec 2023 09:20:53 +0100 Subject: [PATCH 2/2] 6.0.1 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 3d0cc9a5da2..0933d911830 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "undici", - "version": "6.0.0", + "version": "6.0.1", "description": "An HTTP/1.1 client, written from scratch for Node.js", "homepage": "https://undici.nodejs.org", "bugs": {