diff --git a/lib/firehose-instance.js b/lib/firehose-instance.js index 128e27c..e0a5665 100644 --- a/lib/firehose-instance.js +++ b/lib/firehose-instance.js @@ -127,34 +127,34 @@ module.exports = class FirehoseInstance { */ this.recordsToPut = logs; - return this.put(); - } + const initialBatchSize = this.batchSize[1]; - /** - * @private - */ - async put(retry = 1) { + const promises = []; - const recordsLength = this.recordsToPut.length; + for(let offset = 0; offset < logs.length; offset += initialBatchSize) + promises.push(this.putBatch(logs.slice(offset, offset + initialBatchSize))); - if(!recordsLength) - return; // no hay más records para enviar (se enviaron ok o fallaron el máximo de veces posible) + await Promise.allSettled(promises); + } - const batchRecords = this.recordsToPut.slice(0, this.batchSize[retry]); + async putBatch(batch, attemptNumber = 1) { - const response = await this.putRecordBatch(batchRecords); + const response = await this.putRecordBatch(batch); if(response?.FailedPutCount) { - if(this.batchSize[retry + 1]) - return this.put(retry + 1); + const nextBatchSize = this.batchSize[attemptNumber + 1]; - logger.error(`Error creating #${batchRecords.length} Trace logs - retry #${retry}/5 - ${response.Message || 'retry limit reached'}}`); - } + if(!nextBatchSize) + return logger.error('Failed to create a log'); - this.recordsToPut = this.recordsToPut.slice(this.batchSize[retry]); // elimina el batch, funcionó o falló el máximo de veces, para intentear enviar lo que queda en this.recordsToPut + const promises = []; - return this.put(); // no se envía retry porque funciono o no, pero ese bloque ya no se envía más + for(let offset = 0; offset < batch.length; offset += nextBatchSize) + promises.push(this.putBatch(batch.slice(offset, offset + nextBatchSize), attemptNumber + 1)); + + await Promise.allSettled(promises); + } } /** diff --git a/lib/log.js b/lib/log.js index 0f4a8a4..1078bf8 100644 --- a/lib/log.js +++ b/lib/log.js @@ -37,6 +37,8 @@ const LogTracker = require('./log-tracker'); const { getEnv, getTracePrivateFields, hideFieldsFromLog } = require('./helpers/utils'); +const MAX_LOGS_PER_LOCAL_BATCH = 100; + module.exports = class Log { /** @@ -88,7 +90,7 @@ module.exports = class Log { if(!Array.isArray(logs)) logs = [logs]; - if(process.env.JANIS_TRACE_EXTENSION_ENABLED) + if(process.env.JANIS_TRACE_EXTENSION_ENABLED && logs.length < MAX_LOGS_PER_LOCAL_BATCH) return this.addLogsLocally(logs, client); return this.sendToTrace(logs, client); @@ -117,25 +119,19 @@ module.exports = class Log { static async addLogsLocally(logs, client) { - const MAX_LOGS_PER_BATCH = 100; - - for(let offset = 0; offset < logs.length; offset += MAX_LOGS_PER_BATCH) { + const formattedLogs = this.getValidatedLogs(logs, client); - const formattedLogs = this.getValidatedLogs(logs.slice(offset, offset + MAX_LOGS_PER_BATCH), client); + try { - try { + // Local server implemented in Trace Lambda Layer + await axios.post('http://127.0.0.1:8585/logs', { logs: formattedLogs }, { timeout: 300 }); - // Local server implemented in Trace Lambda Layer - await axios.post('http://127.0.0.1:8585/logs', { logs: formattedLogs }, { timeout: 300 }); + } catch(err) { - } catch(err) { - - // If local server fails, go straight to Firehose - logger.error(`Failed to save ${logs.length} logs locally. Fallbacking to Firehose.`, err); - - await this.putFirehoseRecords(formattedLogs); - } + // If local server fails, go straight to Firehose + logger.error(`Failed to save ${logs.length} logs locally. Fallbacking to Firehose.`, err); + await this.putFirehoseRecords(formattedLogs); } } diff --git a/tests/firehose-instance.js b/tests/firehose-instance.js index 9569156..a84147a 100644 --- a/tests/firehose-instance.js +++ b/tests/firehose-instance.js @@ -176,6 +176,53 @@ describe('Firehose Instance', () => { assertAssumeRole(); }); + + it('Should retry and split logs until there is one per request', async () => { + + Firehose.prototype.putRecordBatch.resolves({ FailedPutCount: 1 }); + + await firehoseInstance.putRecords([sampleLog, sampleLog]); + + sinon.assert.callCount(Firehose.prototype.putRecordBatch, 6); + + const expectedLog = formatLogForFirehose(sampleLog); + + // Split by 500 + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(0), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog, expectedLog] + }); + + // Split by 100 + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(1), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog, expectedLog] + }); + + // Split by 50 + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(2), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog, expectedLog] + }); + + // Split by 10 + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(3), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog, expectedLog] + }); + + // Split by 1 + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(4), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog] + }); + sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(5), { + DeliveryStreamName: deliveryStreamName, + Records: [expectedLog] + }); + + assertAssumeRole(); + }); }); context('When Firehose rejects', () => { diff --git a/tests/log.js b/tests/log.js index 664342a..82baa0f 100644 --- a/tests/log.js +++ b/tests/log.js @@ -209,7 +209,7 @@ describe('Log', () => { sinon.assert.notCalled(FirehoseInstance.prototype.putRecords); }); - it('Should send logs in batches of at most 100 logs to extension local server', async () => { + it('Should not send logs to extension local server if they are more than 100 and go directly to firehose', async () => { const { service, userCreated, ...minimalLog } = sampleLog; @@ -222,19 +222,9 @@ describe('Log', () => { const formattedLog = formatLog(expectedLog, 'some-client'); - sinon.assert.calledTwice(axios.post); - sinon.assert.calledWithExactly(axios.post.firstCall, 'http://127.0.0.1:8585/logs', { - logs: new Array(100).fill(formattedLog) - }, { - timeout: 300 - }); - sinon.assert.calledWithExactly(axios.post.secondCall, 'http://127.0.0.1:8585/logs', { - logs: new Array(20).fill(formattedLog) - }, { - timeout: 300 - }); + sinon.assert.notCalled(axios.post); - sinon.assert.notCalled(FirehoseInstance.prototype.putRecords); + sinon.assert.calledOnceWithExactly(FirehoseInstance.prototype.putRecords, new Array(120).fill(formattedLog)); }); it('Should send logs to Firehose if extension local server fails', async () => {