From 27c1da0f892bcc28203f6e2c9cb42dd629af0c67 Mon Sep 17 00:00:00 2001 From: Sten Arthur Laane <21343173+StenAL@users.noreply.github.com> Date: Tue, 30 Apr 2024 18:11:30 +0300 Subject: [PATCH] Fix memory leaks in cloudwatch-integration (#214) * Stop memory leaking every stream name seen in cloudwatch-integration The _postingEvents variable saved every single seen streamName as a key and these entries were never getting deleted. Since cloudwatch-integration stores _postingEvents as a top level variable, it gets shared between all instances of WinstonCloudWatch and leaks memory. To fix this, delete entries for streams that are not posting events. This is safe to do because the only place where _postingEvents entries get read checks whether the value is falsy. If a key doesn't exist in an object, accessing it returns `undefined` which is falsy and the code works the same as before. * Stop memory leaking CloudWatch sequence tokens in cloudwatch-integration The _nextToken variable saved every sequence token for every stream seen. Since cloudwatch-integration stores _nextToken as a top level variable, it gets shared between all instances of WinstonCloudWatch and leaks memory. To fix this, add a clearSequenceToken method and call it from kthxbye to delete the entry for the WinstonCloudWatch instance that is done sending logs. --- index.js | 5 +++++ lib/cloudwatch-integration.js | 16 +++++++++------ test/cloudwatch-integration.js | 36 ++++++++++++++++++++++++++++++---- test/index.js | 3 ++- 4 files changed, 49 insertions(+), 11 deletions(-) diff --git a/index.js b/index.js index 9dbce65..cfc82eb 100644 --- a/index.js +++ b/index.js @@ -135,6 +135,11 @@ WinstonCloudWatch.prototype.kthxbye = function(callback) { this.submit((function(error) { debug('submit done', error); + var groupName = typeof this.logGroupName === 'function' ? + this.logGroupName() : this.logGroupName; + var streamName = typeof this.logStreamName === 'function' ? + this.logStreamName() : this.logStreamName; + cloudWatchIntegration.clearSequenceToken(groupName, streamName); if (error) return callback(error); if (isEmpty(this.logEvents)) return callback(); if (Date.now() > this.flushTimeout) return callback(new Error('Timeout reached while waiting for logs to submit')); diff --git a/lib/cloudwatch-integration.js b/lib/cloudwatch-integration.js index 76b4161..7286f3a 100644 --- a/lib/cloudwatch-integration.js +++ b/lib/cloudwatch-integration.js @@ -12,7 +12,8 @@ var find = require('lodash.find'), debug = require('./utils').debug; var lib = { - _postingEvents: {} + _postingEvents: {}, + _nextToken: {} }; lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, options, cb) { @@ -27,7 +28,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op lib._postingEvents[streamName] = true; safeUpload(function(err) { - lib._postingEvents[streamName] = false; + delete lib._postingEvents[streamName]; return cb(err); }); @@ -93,7 +94,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op lib._nextToken[previousKeyMapKey(groupName, streamName)] = data.nextSequenceToken; } - lib._postingEvents[streamName] = false; + delete lib._postingEvents[streamName]; cb() } }); @@ -106,7 +107,7 @@ lib.submitWithAnotherToken = function(aws, groupName, streamName, payload, reten lib.getToken(aws, groupName, streamName, retentionInDays, options, function(err, token) { payload.sequenceToken = token; aws.putLogEvents(payload, function(err) { - lib._postingEvents[streamName] = false; + delete lib._postingEvents[streamName]; cb(err) }); }) @@ -118,7 +119,7 @@ function retrySubmit(aws, payload, times, cb) { if (err && times > 0) { retrySubmit(aws, payload, times - 1, cb) } else { - lib._postingEvents[payload.logStreamName] = false; + delete lib._postingEvents[payload.logStreamName]; cb(err) } }) @@ -151,7 +152,6 @@ lib.getToken = function(aws, groupName, streamName, retentionInDays, options, cb } }); }; -lib._nextToken = {}; function previousKeyMapKey(group, stream) { return group + ':' + stream; @@ -229,4 +229,8 @@ lib.ignoreInProgress = function ignoreInProgress(cb) { }; }; +lib.clearSequenceToken = function clearSequenceToken(group, stream) { + delete lib._nextToken[previousKeyMapKey(group, stream)]; +} + module.exports = lib; diff --git a/test/cloudwatch-integration.js b/test/cloudwatch-integration.js index de6065c..3da04bb 100644 --- a/test/cloudwatch-integration.js +++ b/test/cloudwatch-integration.js @@ -37,7 +37,7 @@ describe('cloudwatch-integration', function() { }, function() { // The second upload call should get ignored aws.putLogEvents.calledOnce.should.equal(true); - lib._postingEvents['stream'] = false; // reset + delete lib._postingEvents['stream']; // reset done() }); }); @@ -54,7 +54,7 @@ describe('cloudwatch-integration', function() { }, function() { // The second upload call should get ignored lib.getToken.calledOnce.should.equal(true); - lib._postingEvents['stream'] = false; // reset + delete lib._postingEvents['stream']; // reset done() }); }); @@ -72,8 +72,8 @@ describe('cloudwatch-integration', function() { lib.getToken.calledTwice.should.equal(true); - lib._postingEvents['stream1'] = false; // reset - lib._postingEvents['stream2'] = false; // reset + delete lib._postingEvents['stream1']; // reset + delete lib._postingEvents['stream2']; // reset }); it('truncates very large messages and alerts the error handler', function(done) { @@ -524,4 +524,32 @@ describe('cloudwatch-integration', function() { }); + describe('clearSequenceToken', function() { + var aws = {}; + + beforeEach(function() { + sinon.stub(lib, 'getToken').yields(null, 'token'); + }); + + it('clears sequence token set by upload', function(done) { + var nextSequenceToken = 'abc123'; + var group = 'group'; + var stream = 'stream'; + aws.putLogEvents = sinon.stub().yields(null, { nextSequenceToken: nextSequenceToken }); + + lib.upload(aws, group, stream, Array(20), 0, {}, function() { + lib._nextToken.should.deepEqual({ 'group:stream': nextSequenceToken }); + lib.clearSequenceToken(group, stream); + lib._nextToken.should.deepEqual({}); + done(); + }); + }); + + afterEach(function() { + lib.getToken.restore(); + }); + }) + + + }); diff --git a/test/index.js b/test/index.js index fc7ba48..f3f59f0 100644 --- a/test/index.js +++ b/test/index.js @@ -20,7 +20,8 @@ var stubbedWinston = { upload: sinon.spy(function(aws, groupName, streamName, logEvents, retention, options, cb) { this.lastLoggedEvents = logEvents.splice(0, 20); cb(); - }) + }), + clearSequenceToken: sinon.stub() }; var clock = sinon.useFakeTimers();