diff --git a/lib/handler/RetryHandler.js b/lib/handler/RetryHandler.js index fa4485dda33..4fde7537455 100644 --- a/lib/handler/RetryHandler.js +++ b/lib/handler/RetryHandler.js @@ -9,6 +9,13 @@ const { safeHTTPMethods } = require('../core/util') +function calculateRetryAfterHeader (retryAfter) { + const current = Date.now() + const diff = new Date(retryAfter).getTime() - current + + return diff +} + class RetryHandler { constructor (opts, handlers, retryOpts) { const { @@ -22,6 +29,7 @@ class RetryHandler { methods, idempotent, codes, + retryAfter, status: statusCodes } = retryOpts ?? {} @@ -32,6 +40,7 @@ class RetryHandler { this.aborted = false this.retryOpts = { retry: retryFn ?? RetryHandler[kRetryHandlerDefaultRetry], + retryAfter: retryAfter ?? true, maxTimeout: maxTimeout ?? 30 * 1000, // 30s, timeout: minTimeout ?? 500, // .5s timeoutFactor: timeoutFactor ?? 2, @@ -41,7 +50,7 @@ class RetryHandler { // States weather or not retry on idempotent methods idempotent: idempotent ?? false, // Indicates which errors to retry - status: statusCodes ?? [500, 502, 503, 504], + status: statusCodes ?? [500, 502, 503, 504, 429], // List of errors to retry codes: codes ?? [ 'ECONNRESET', @@ -88,10 +97,11 @@ class RetryHandler { } static [kRetryHandlerDefaultRetry] (err, { counter, currentTimeout }, opts) { - const { statusCode, code } = err + const { statusCode, code, headers } = err const { method } = opts const { max, + timeout, maxTimeout, timeoutFactor, status, @@ -100,6 +110,9 @@ class RetryHandler { methods } = opts.retry + currentTimeout = + currentTimeout != null && currentTimeout > 0 ? currentTimeout : timeout + // Any code that is not a Undici's originated and allowed to retry if ( code && @@ -128,10 +141,18 @@ class RetryHandler { // If we reached the max number of retries if (counter > max) return null - const retryTimeout = Math.min( - currentTimeout * timeoutFactor ** counter, - maxTimeout - ) + let retryAfterHeader = headers != null && headers['retry-after'] + if (retryAfterHeader) { + retryAfterHeader = Number(retryAfterHeader) + retryAfterHeader = isNaN(retryAfterHeader) + ? calculateRetryAfterHeader(retryAfterHeader) + : retryAfterHeader * 1e3 // Retry-After is in seconds + } + + const retryTimeout = + retryAfterHeader > 0 + ? Math.min(retryAfterHeader, maxTimeout) + : Math.min(currentTimeout * timeoutFactor ** counter, maxTimeout) return retryTimeout } diff --git a/test/retry-handler.js b/test/retry-handler.js index 9bafc9b58af..419687ec7e0 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -2,16 +2,17 @@ const { createServer } = require('node:http') const { once } = require('node:events') -const { getLocal } = require('mockttp') const tap = require('tap') const { RetryHandler, Client } = require('..') -tap.test('Should retry status code', async t => { +// TODO: rewrite tests to not use explicit Promise handling +// TODO: add tests for retry-after on 429 + +tap.test('Should retry status code', t => { let counter = 0 - let res const chunks = [] - const server = getLocal() + const server = createServer() const dispatchOptions = { method: 'GET', path: '/', @@ -19,16 +20,174 @@ tap.test('Should retry status code', async t => { 'content-type': 'application/json' } } - const promise = new Promise(resolve => { - res = resolve + + t.plan(4) + + server.on('request', (req, res) => { + switch (counter) { + case 0: + req.destroy() + return + case 1: + res.writeHead(500) + res.end('failed') + return + case 2: + res.writeHead(200) + res.end('hello world!') + return + default: + t.fail() + } + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler( + dispatchOptions, + { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + chunks.push(chunk) + return true + }, + onComplete () { + t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') + t.equal(counter, 2) + }, + onError () { + t.fail() + } + } + }, + { + retry: err => { + counter++ + + if ( + err.statusCode === 500 || + err.message.includes('other side closed') + ) { + return 500 + } + } + } + ) + + t.teardown(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + }, + handler + ) + }) +}) + +tap.test('Should support idempotency over safe method', t => { + const chunks = [] + const server = createServer() + let postCounter = 0 + let getCounter = 0 + let getIdempotentCounter = 0 + const dispatchOptions1 = { + method: 'GET', + path: '/1', + headers: { + 'content-type': 'application/json' + } + } + const dispatchOptions2 = { + method: 'POST', + path: '/2', + headers: { + 'content-type': 'application/json' + }, + body: 'hello world!' + } + const dispatchOptions3 = { + method: 'HEAD', + path: '/3', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + if (req.url === '/1' && req.method === 'GET') { + switch (getCounter) { + case 0: + req.destroy() + getCounter++ + return + case 1: + res.writeHead(500) + res.end('failed') + getCounter++ + return + case 2: + res.writeHead(200) + res.end('hello world!') + getCounter++ + return + default: + t.error('unexpected request') + } + } else if (req.url === '/2' && req.method === 'POST') { + switch (postCounter) { + case 0: + req.destroy() + postCounter++ + return + default: + t.error('unexpected request') + } + } else if (req.url === '/3' && req.method === 'HEAD') { + switch (getIdempotentCounter) { + case 0: + res.writeHead(500) + res.end('failed') + getIdempotentCounter++ + return + case 1: + res.writeHead(200) + res.end('hello world!') + getIdempotentCounter++ + return + default: + t.error('unexpected request') + } + } else { + t.error('unexpected request') + } }) - await server.start() + t.plan(10) - const client = new Client(`http://localhost:${server.port}`) - const handler = new RetryHandler( - dispatchOptions, - { + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions1, { dispatch: client.dispatch.bind(client), handler: { onConnect () { @@ -47,234 +206,368 @@ tap.test('Should retry status code', async t => { }, onComplete () { t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') - t.equal(counter, 2) - res() }, - onError () { - t.fail() + onError (err) { + t.error(err) } } - }, - { - retry: err => { - counter++ + }) - if ( - err.statusCode === 500 || - err.message.includes('other side closed') - ) { - return 500 + const handler2 = new RetryHandler(dispatchOptions2, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.fail() + }, + onData (chunk) { + t.fail() + }, + onComplete () { + t.fail() + }, + onError (err) { + t.equal(err.message, 'other side closed') + t.equal(err.code, 'UND_ERR_SOCKET') } } - } - ) - - t.teardown(async () => { - await client.close() - await server.stop() - }) + }) + + const handler3 = new RetryHandler( + dispatchOptions3, + { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + return true + }, + onComplete () { + t.pass() + }, + onError (err) { + t.error(err) + } + } + }, + { + idempotent: true + } + ) - await server.forAnyRequest().thenCloseConnection() - await server.forAnyRequest().thenReply(500, 'Internal Server Error') - await server.forAnyRequest().thenReply(200, 'hello world!') + t.teardown(async () => { + await client.close() + server.close() - client.dispatch( - { - method: 'GET', - path: '/', - headers: { - 'content-type': 'application/json' - } - }, - handler - ) + await once(server, 'close') + }) - await promise + client.dispatch( + { + method: 'GET', + path: '/1', + headers: { + 'content-type': 'application/json' + } + }, + handler + ) + + client.dispatch( + { + method: 'POST', + path: '/2', + headers: { + 'content-type': 'application/json' + }, + body: 'hello world!' + }, + handler2 + ) + + client.dispatch( + { + method: 'HEAD', + path: '/3', + headers: { + 'content-type': 'application/json' + } + }, + handler3 + ) + }) }) -tap.test('Should support idempotency over safe method', async t => { - let res, rej +tap.test('Should use retry-after header for retries', t => { + let counter = 0 const chunks = [] - const server = getLocal() + const server = createServer() + let checkpoint const dispatchOptions = { - method: 'GET', + method: 'PUT', path: '/', headers: { 'content-type': 'application/json' } } - const promise = new Promise((resolve, reject) => { - res = resolve - rej = reject - }) - await server.start() + t.plan(4) - const client = new Client(`http://localhost:${server.port}`) - const handler = new RetryHandler(dispatchOptions, { - dispatch: client.dispatch.bind(client), - handler: { - onConnect () { - t.pass() - }, - onBodySent () { - t.pass() - }, - onHeaders (status, _rawHeaders, resume, _statusMessage) { - t.equal(status, 200) - return true - }, - onData (chunk) { - chunks.push(chunk) - return true - }, - onComplete () { - t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') - }, - onError (err) { - rej(err) - } + server.on('request', (req, res) => { + switch (counter) { + case 0: + res.writeHead(429, { + 'retry-after': 1 + }) + res.end('rate limit') + checkpoint = Date.now() + counter++ + return + case 1: + res.writeHead(200) + res.end('hello world!') + t.ok(Date.now() - checkpoint >= 500) + counter++ + return + default: + t.fail('unexpected request') } }) - const handler2 = new RetryHandler(dispatchOptions, { - dispatch: client.dispatch.bind(client), - handler: { - onConnect () { - t.pass() - }, - onBodySent () { - t.pass() - }, - onHeaders (status, _rawHeaders, resume, _statusMessage) { - t.equal(status, 200) - return true - }, - onData (chunk) { - chunks.push(chunk) - return true - }, - onComplete () { - t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') - rej() - }, - onError (err) { - t.equal(err.message, 'other side closed') - t.equal(err.code, 'UND_ERR_SOCKET') - res() + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + chunks.push(chunk) + return true + }, + onComplete () { + t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') + }, + onError (err) { + t.error(err) + } } - } - }) + }) - t.teardown(async () => { - await client.close() - await server.stop() - }) + t.teardown(async () => { + await client.close() + server.close() - await server.forAnyRequest().thenCloseConnection() - await server.forAnyRequest().thenReply(500, 'Internal Server Error') - await server.forAnyRequest().thenReply(200, 'hello world!') - await server.forAnyRequest().thenCloseConnection() - - client.dispatch( - { - method: 'GET', - path: '/', - headers: { - 'content-type': 'application/json' - } - }, - handler - ) - - client.dispatch( - { - method: 'POST', - path: '/', - headers: { - 'content-type': 'application/json' - }, - body: 'hello world!' - }, - handler2 - ) + await once(server, 'close') + }) - return promise + client.dispatch( + { + method: 'PUT', + path: '/', + headers: { + 'content-type': 'application/json' + } + }, + handler + ) + }) }) -tap.test('Should retry with defaults', async t => { - let res, rej +tap.test('Should use retry-after header for retries (date)', t => { + let counter = 0 const chunks = [] - const server = getLocal() + const server = createServer() + let checkpoint const dispatchOptions = { - method: 'GET', + method: 'PUT', path: '/', headers: { 'content-type': 'application/json' } } - const promise = new Promise((resolve, reject) => { - res = resolve - rej = reject + + t.plan(4) + + server.on('request', (req, res) => { + switch (counter) { + case 0: + res.writeHead(429, { + 'retry-after': new Date( + new Date().setSeconds(new Date().getSeconds() + 1) + ).toUTCString() + }) + res.end('rate limit') + checkpoint = Date.now() + counter++ + return + case 1: + res.writeHead(200) + res.end('hello world!') + t.ok(Date.now() - checkpoint >= 1) + counter++ + return + default: + t.fail('unexpected request') + } }) - await server.start() + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + chunks.push(chunk) + return true + }, + onComplete () { + t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') + }, + onError (err) { + t.error(err) + } + } + }) + + t.teardown(async () => { + await client.close() + server.close() - const client = new Client(`http://localhost:${server.port}`) - const handler = new RetryHandler(dispatchOptions, { - dispatch: client.dispatch.bind(client), - handler: { - onConnect () { - t.pass() - }, - onBodySent () { - t.pass() - }, - onHeaders (status, _rawHeaders, resume, _statusMessage) { - t.equal(status, 200) - return true - }, - onData (chunk) { - chunks.push(chunk) - return true - }, - onComplete () { - t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') - res() + await once(server, 'close') + }) + + client.dispatch( + { + method: 'PUT', + path: '/', + headers: { + 'content-type': 'application/json' + } }, - onError (err) { - rej(err) - } - } + handler + ) }) +}) + +tap.test('Should retry with defaults', t => { + let counter = 0 + const chunks = [] + const server = createServer() + const dispatchOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } - t.teardown(async () => { - await client.close() - await server.stop() + server.on('request', (req, res) => { + switch (counter) { + case 0: + req.destroy() + counter++ + return + case 1: + res.writeHead(500) + res.end('failed') + counter++ + return + case 2: + res.writeHead(200) + res.end('hello world!') + counter++ + return + default: + t.fail() + } }) - await server.forAnyRequest().thenCloseConnection() - await server.forAnyRequest().thenReply(500, 'Internal Server Error') - await server.forAnyRequest().thenReply(200, 'hello world!') + t.plan(3) - client.dispatch( - { - method: 'GET', - path: '/', - headers: { - 'content-type': 'application/json' + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + chunks.push(chunk) + return true + }, + onComplete () { + t.equal(Buffer.concat(chunks).toString('utf-8'), 'hello world!') + }, + onError (err) { + t.error(err) + } } - }, - handler - ) + }) + + t.teardown(async () => { + await client.close() + server.close() - await promise + await once(server, 'close') + }) + + client.dispatch( + { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + }, + handler + ) + }) }) -tap.test('Should handle 206 partial content', async t => { +tap.test('Should handle 206 partial content', t => { const chunks = [] let counter = 0 - let res // Took from: https://github.com/nxtedition/nxt-lib/blob/4b001ebc2f22cf735a398f35ff800dd553fe5933/test/undici/retry.js#L47 let x = 0 @@ -303,73 +596,70 @@ tap.test('Should handle 206 partial content', async t => { 'content-type': 'application/json' } } - const promise = new Promise(resolve => { - res = resolve - }) - server.listen() - await once(server, 'listening') + t.plan(6) - const client = new Client(`http://localhost:${server.address().port}`) - const handler = new RetryHandler( - dispatchOptions, - { - dispatch: (...args) => { - return client.dispatch(...args) - }, - handler: { - onConnect () { - t.pass() - }, - onBodySent () {}, - onHeaders (status, _rawHeaders, resume, _statusMessage) { - t.equal(status, 200) - return true - }, - onData (chunk) { - chunks.push(chunk) - return true - }, - onComplete () { - t.equal(Buffer.concat(chunks).toString('utf-8'), 'abcdef') - t.equal(counter, 1) - res() + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler( + dispatchOptions, + { + dispatch: (...args) => { + return client.dispatch(...args) }, - onError () { - t.fail() + handler: { + onConnect () { + t.pass() + }, + onBodySent () { + t.pass() + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.equal(status, 200) + return true + }, + onData (chunk) { + chunks.push(chunk) + return true + }, + onComplete () { + t.equal(Buffer.concat(chunks).toString('utf-8'), 'abcdef') + t.equal(counter, 1) + }, + onError () { + t.fail() + } } - } - }, - { - retry: function (err) { - counter++ + }, + { + retry: function (err) { + counter++ - if (err.code && err.code === 'UND_ERR_DESTROYED') { - return null - } + if (err.code && err.code === 'UND_ERR_DESTROYED') { + return null + } - return err.statusCode === 206 ? null : 800 - } - } - ) - - client.dispatch( - { - method: 'GET', - path: '/', - headers: { - 'content-type': 'application/json' + return err.statusCode === 206 ? null : 800 + } } - }, - handler - ) + ) + + client.dispatch( + { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + }, + handler + ) - t.teardown(async () => { - await client.close() + t.teardown(async () => { + await client.close() - server.close() - await once(server, 'close') + server.close() + await once(server, 'close') + }) }) - - await promise })