Skip to content

Commit

Permalink
Skipped layer for more than 100 logs and improved firehose requests t…
Browse files Browse the repository at this point in the history
…o be concurrent
  • Loading branch information
jormaechea committed Feb 9, 2024
1 parent 92746ea commit b51109a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 45 deletions.
34 changes: 17 additions & 17 deletions lib/firehose-instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,34 @@ module.exports = class FirehoseInstance {
*/
this.recordsToPut = logs;

return this.put();
}
const initialBatchSize = this.batchSize[1];

/**
* @private
*/
async put(retry = 1) {
const promises = [];

const recordsLength = this.recordsToPut.length;
for(let offset = 0; offset < logs.length; offset += initialBatchSize)
promises.push(this.putBatch(logs.slice(offset, offset + initialBatchSize)));

if(!recordsLength)
return; // no hay más records para enviar (se enviaron ok o fallaron el máximo de veces posible)
await Promise.allSettled(promises);
}

const batchRecords = this.recordsToPut.slice(0, this.batchSize[retry]);
async putBatch(batch, attemptNumber = 1) {

const response = await this.putRecordBatch(batchRecords);
const response = await this.putRecordBatch(batch);

if(response?.FailedPutCount) {

if(this.batchSize[retry + 1])
return this.put(retry + 1);
const nextBatchSize = this.batchSize[attemptNumber + 1];

logger.error(`Error creating #${batchRecords.length} Trace logs - retry #${retry}/5 - ${response.Message || 'retry limit reached'}}`);
}
if(!nextBatchSize)
return logger.error('Failed to create a log');

this.recordsToPut = this.recordsToPut.slice(this.batchSize[retry]); // elimina el batch, funcionó o falló el máximo de veces, para intentear enviar lo que queda en this.recordsToPut
const promises = [];

return this.put(); // no se envía retry porque funciono o no, pero ese bloque ya no se envía más
for(let offset = 0; offset < batch.length; offset += nextBatchSize)
promises.push(this.putBatch(batch.slice(offset, offset + nextBatchSize), attemptNumber + 1));

await Promise.allSettled(promises);
}
}

/**
Expand Down
26 changes: 11 additions & 15 deletions lib/log.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const LogTracker = require('./log-tracker');

const { getEnv, getTracePrivateFields, hideFieldsFromLog } = require('./helpers/utils');

const MAX_LOGS_PER_LOCAL_BATCH = 100;

module.exports = class Log {

/**
Expand Down Expand Up @@ -88,7 +90,7 @@ module.exports = class Log {
if(!Array.isArray(logs))
logs = [logs];

if(process.env.JANIS_TRACE_EXTENSION_ENABLED)
if(process.env.JANIS_TRACE_EXTENSION_ENABLED && logs.length < MAX_LOGS_PER_LOCAL_BATCH)
return this.addLogsLocally(logs, client);

return this.sendToTrace(logs, client);
Expand Down Expand Up @@ -117,25 +119,19 @@ module.exports = class Log {

static async addLogsLocally(logs, client) {

const MAX_LOGS_PER_BATCH = 100;

for(let offset = 0; offset < logs.length; offset += MAX_LOGS_PER_BATCH) {
const formattedLogs = this.getValidatedLogs(logs, client);

const formattedLogs = this.getValidatedLogs(logs.slice(offset, offset + MAX_LOGS_PER_BATCH), client);
try {

try {
// Local server implemented in Trace Lambda Layer
await axios.post('http://127.0.0.1:8585/logs', { logs: formattedLogs }, { timeout: 300 });

// Local server implemented in Trace Lambda Layer
await axios.post('http://127.0.0.1:8585/logs', { logs: formattedLogs }, { timeout: 300 });
} catch(err) {

} catch(err) {

// If local server fails, go straight to Firehose
logger.error(`Failed to save ${logs.length} logs locally. Fallbacking to Firehose.`, err);

await this.putFirehoseRecords(formattedLogs);
}
// If local server fails, go straight to Firehose
logger.error(`Failed to save ${logs.length} logs locally. Fallbacking to Firehose.`, err);

await this.putFirehoseRecords(formattedLogs);
}

}
Expand Down
47 changes: 47 additions & 0 deletions tests/firehose-instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,53 @@ describe('Firehose Instance', () => {

assertAssumeRole();
});

it('Should retry and split logs until there is one per request', async () => {

Firehose.prototype.putRecordBatch.resolves({ FailedPutCount: 1 });

await firehoseInstance.putRecords([sampleLog, sampleLog]);

sinon.assert.callCount(Firehose.prototype.putRecordBatch, 6);

const expectedLog = formatLogForFirehose(sampleLog);

// Split by 500
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(0), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog, expectedLog]
});

// Split by 100
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(1), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog, expectedLog]
});

// Split by 50
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(2), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog, expectedLog]
});

// Split by 10
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(3), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog, expectedLog]
});

// Split by 1
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(4), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog]
});
sinon.assert.calledWithExactly(Firehose.prototype.putRecordBatch.getCall(5), {
DeliveryStreamName: deliveryStreamName,
Records: [expectedLog]
});

assertAssumeRole();
});
});

context('When Firehose rejects', () => {
Expand Down
16 changes: 3 additions & 13 deletions tests/log.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ describe('Log', () => {
sinon.assert.notCalled(FirehoseInstance.prototype.putRecords);
});

it('Should send logs in batches of at most 100 logs to extension local server', async () => {
it('Should not send logs to extension local server if they are more than 100 and go directly to firehose', async () => {

const { service, userCreated, ...minimalLog } = sampleLog;

Expand All @@ -222,19 +222,9 @@ describe('Log', () => {

const formattedLog = formatLog(expectedLog, 'some-client');

sinon.assert.calledTwice(axios.post);
sinon.assert.calledWithExactly(axios.post.firstCall, 'http://127.0.0.1:8585/logs', {
logs: new Array(100).fill(formattedLog)
}, {
timeout: 300
});
sinon.assert.calledWithExactly(axios.post.secondCall, 'http://127.0.0.1:8585/logs', {
logs: new Array(20).fill(formattedLog)
}, {
timeout: 300
});
sinon.assert.notCalled(axios.post);

sinon.assert.notCalled(FirehoseInstance.prototype.putRecords);
sinon.assert.calledOnceWithExactly(FirehoseInstance.prototype.putRecords, new Array(120).fill(formattedLog));
});

it('Should send logs to Firehose if extension local server fails', async () => {
Expand Down

0 comments on commit b51109a

Please sign in to comment.