From e5c9d703e63cd5ad691b8ce26e3f9a81c598f2e3 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Fri, 6 Oct 2023 18:35:32 +0200 Subject: [PATCH] fix(#2311): End stream after body sent (#2314) --- lib/client.js | 6 +- test/fetch/http2.js | 162 ++++++++++++++++++++++++++++++-------------- 2 files changed, 115 insertions(+), 53 deletions(-) diff --git a/lib/client.js b/lib/client.js index b898bfd2672..be00382d0d0 100644 --- a/lib/client.js +++ b/lib/client.js @@ -2096,13 +2096,17 @@ async function writeIterable ({ h2stream, body, client, request, socket, content throw socket[kError] } - if (!h2stream.write(chunk)) { + const res = h2stream.write(chunk) + request.onBodySent(chunk) + if (!res) { await waitForDrain() } } } catch (err) { h2stream.destroy(err) } finally { + request.onRequestSent() + h2stream.end() h2stream .off('close', onDrain) .off('drain', onDrain) diff --git a/test/fetch/http2.js b/test/fetch/http2.js index e1d81d197d8..b426de3cbe5 100644 --- a/test/fetch/http2.js +++ b/test/fetch/http2.js @@ -13,24 +13,29 @@ const { Client, fetch } = require('../..') const nodeVersion = Number(process.version.split('v')[1].split('.')[0]) -plan(5) +plan(6) -test('[Fetch] Simple GET with h2', async t => { - const server = createSecureServer(pem) - const expectedRequestBody = 'hello h2!' +test('[Fetch] Issue#2311', async t => { + const expectedBody = 'hello from client!' - server.on('stream', async (stream, headers) => { - stream.respond({ + const server = createSecureServer(pem, async (req, res) => { + let body = '' + + req.setEncoding('utf8') + + res.writeHead(200, { 'content-type': 'text/plain; charset=utf-8', - 'x-custom-h2': headers['x-my-header'], - 'x-method': headers[':method'], - ':status': 200 + 'x-custom-h2': req.headers['x-my-header'] }) - stream.end(expectedRequestBody) + for await (const chunk of req) { + body += chunk + } + + res.end(body) }) - t.plan(3) + t.plan(1) server.listen() await once(server, 'listening') @@ -46,12 +51,13 @@ test('[Fetch] Simple GET with h2', async t => { `https://localhost:${server.address().port}/`, // Needs to be passed to disable the reject unauthorized { - method: 'GET', + method: 'POST', dispatcher: client, headers: { 'x-my-header': 'foo', 'content-type': 'text-plain' - } + }, + body: expectedBody } ) @@ -60,30 +66,25 @@ test('[Fetch] Simple GET with h2', async t => { t.teardown(server.close.bind(server)) t.teardown(client.close.bind(client)) - t.equal(responseBody, expectedRequestBody) - t.equal(response.headers.get('x-method'), 'GET') - t.equal(response.headers.get('x-custom-h2'), 'foo') + t.equal(responseBody, expectedBody) }) -test('[Fetch] Should handle h2 request with body (string or buffer)', async t => { +test('[Fetch] Simple GET with h2', async t => { const server = createSecureServer(pem) - const expectedBody = 'hello from client!' const expectedRequestBody = 'hello h2!' - const requestBody = [] server.on('stream', async (stream, headers) => { - stream.on('data', chunk => requestBody.push(chunk)) - stream.respond({ 'content-type': 'text/plain; charset=utf-8', 'x-custom-h2': headers['x-my-header'], + 'x-method': headers[':method'], ':status': 200 }) stream.end(expectedRequestBody) }) - t.plan(2) + t.plan(3) server.listen() await once(server, 'listening') @@ -99,13 +100,12 @@ test('[Fetch] Should handle h2 request with body (string or buffer)', async t => `https://localhost:${server.address().port}/`, // Needs to be passed to disable the reject unauthorized { - method: 'POST', + method: 'GET', dispatcher: client, headers: { 'x-my-header': 'foo', 'content-type': 'text-plain' - }, - body: expectedBody + } } ) @@ -114,21 +114,19 @@ test('[Fetch] Should handle h2 request with body (string or buffer)', async t => t.teardown(server.close.bind(server)) t.teardown(client.close.bind(client)) - t.equal(Buffer.concat(requestBody).toString('utf-8'), expectedBody) t.equal(responseBody, expectedRequestBody) + t.equal(response.headers.get('x-method'), 'GET') + t.equal(response.headers.get('x-custom-h2'), 'foo') }) -// Skipping for now, there is something odd in the way the body is handled -test('[Fetch] Should handle h2 request with body (stream)', { skip: nodeVersion === 16 }, async t => { +test('[Fetch] Should handle h2 request with body (string or buffer)', async t => { const server = createSecureServer(pem) - const expectedBody = readFileSync(__filename, 'utf-8') - const stream = createReadStream(__filename) - const requestChunks = [] + const expectedBody = 'hello from client!' + const expectedRequestBody = 'hello h2!' + const requestBody = [] server.on('stream', async (stream, headers) => { - t.equal(headers[':method'], 'PUT') - t.equal(headers[':path'], '/') - t.equal(headers[':scheme'], 'https') + stream.on('data', chunk => requestBody.push(chunk)) stream.respond({ 'content-type': 'text/plain; charset=utf-8', @@ -136,16 +134,12 @@ test('[Fetch] Should handle h2 request with body (stream)', { skip: nodeVersion ':status': 200 }) - stream.end('hello h2!') - - for await (const chunk of stream) { - requestChunks.push(chunk) - } + stream.end(expectedRequestBody) }) - t.plan(8) + t.plan(2) - server.listen(0) + server.listen() await once(server, 'listening') const client = new Client(`https://localhost:${server.address().port}`, { @@ -155,32 +149,96 @@ test('[Fetch] Should handle h2 request with body (stream)', { skip: nodeVersion allowH2: true }) - t.teardown(server.close.bind(server)) - t.teardown(client.close.bind(client)) - const response = await fetch( `https://localhost:${server.address().port}/`, // Needs to be passed to disable the reject unauthorized { - method: 'PUT', + method: 'POST', dispatcher: client, headers: { 'x-my-header': 'foo', 'content-type': 'text-plain' }, - body: Readable.toWeb(stream), - duplex: 'half' + body: expectedBody } ) const responseBody = await response.text() - t.equal(response.status, 200) - t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8') - t.equal(response.headers.get('x-custom-h2'), 'foo') - t.equal(responseBody, 'hello h2!') - t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + t.equal(Buffer.concat(requestBody).toString('utf-8'), expectedBody) + t.equal(responseBody, expectedRequestBody) }) + +// Skipping for now, there is something odd in the way the body is handled +test( + '[Fetch] Should handle h2 request with body (stream)', + { skip: nodeVersion === 16 }, + async t => { + const server = createSecureServer(pem) + const expectedBody = readFileSync(__filename, 'utf-8') + const stream = createReadStream(__filename) + const requestChunks = [] + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'PUT') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + + for await (const chunk of stream) { + requestChunks.push(chunk) + } + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await fetch( + `https://localhost:${server.address().port}/`, + // Needs to be passed to disable the reject unauthorized + { + method: 'PUT', + dispatcher: client, + headers: { + 'x-my-header': 'foo', + 'content-type': 'text-plain' + }, + body: Readable.toWeb(stream), + duplex: 'half' + } + ) + + const responseBody = await response.text() + + t.equal(response.status, 200) + t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8') + t.equal(response.headers.get('x-custom-h2'), 'foo') + t.equal(responseBody, 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) + } +) test('Should handle h2 request with body (Blob)', { skip: !Blob }, async t => { const server = createSecureServer(pem) const expectedBody = 'asd'