Skip to content

Commit

Permalink
Fix memory leaks in cloudwatch-integration (#214)
Browse files Browse the repository at this point in the history
* Stop memory leaking every stream name seen in cloudwatch-integration

The _postingEvents variable saved every single seen streamName as a key
and these entries were never getting deleted. Since
cloudwatch-integration stores _postingEvents as a top level variable, it
gets shared between all instances of WinstonCloudWatch and leaks memory.

To fix this, delete entries for streams that are not posting events.
This is safe to do because the only place where _postingEvents entries
get read checks whether the value is falsy. If a key doesn't exist in an
object, accessing it returns `undefined` which is falsy and the code
works the same as before.

* Stop memory leaking CloudWatch sequence tokens in cloudwatch-integration

The _nextToken variable saved every sequence token for every stream seen.
Since cloudwatch-integration stores _nextToken as a top level variable,
it gets shared between all instances of WinstonCloudWatch and leaks
memory.

To fix this, add a clearSequenceToken method and call it from kthxbye to
delete the entry for the WinstonCloudWatch instance that is done sending
logs.
  • Loading branch information
StenAL authored Apr 30, 2024
1 parent af59811 commit 27c1da0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ WinstonCloudWatch.prototype.kthxbye = function(callback) {

this.submit((function(error) {
debug('submit done', error);
var groupName = typeof this.logGroupName === 'function' ?
this.logGroupName() : this.logGroupName;
var streamName = typeof this.logStreamName === 'function' ?
this.logStreamName() : this.logStreamName;
cloudWatchIntegration.clearSequenceToken(groupName, streamName);
if (error) return callback(error);
if (isEmpty(this.logEvents)) return callback();
if (Date.now() > this.flushTimeout) return callback(new Error('Timeout reached while waiting for logs to submit'));
Expand Down
16 changes: 10 additions & 6 deletions lib/cloudwatch-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ var find = require('lodash.find'),
debug = require('./utils').debug;

var lib = {
_postingEvents: {}
_postingEvents: {},
_nextToken: {}
};

lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, options, cb) {
Expand All @@ -27,7 +28,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op

lib._postingEvents[streamName] = true;
safeUpload(function(err) {
lib._postingEvents[streamName] = false;
delete lib._postingEvents[streamName];
return cb(err);
});

Expand Down Expand Up @@ -93,7 +94,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, op
lib._nextToken[previousKeyMapKey(groupName, streamName)] = data.nextSequenceToken;
}

lib._postingEvents[streamName] = false;
delete lib._postingEvents[streamName];
cb()
}
});
Expand All @@ -106,7 +107,7 @@ lib.submitWithAnotherToken = function(aws, groupName, streamName, payload, reten
lib.getToken(aws, groupName, streamName, retentionInDays, options, function(err, token) {
payload.sequenceToken = token;
aws.putLogEvents(payload, function(err) {
lib._postingEvents[streamName] = false;
delete lib._postingEvents[streamName];
cb(err)
});
})
Expand All @@ -118,7 +119,7 @@ function retrySubmit(aws, payload, times, cb) {
if (err && times > 0) {
retrySubmit(aws, payload, times - 1, cb)
} else {
lib._postingEvents[payload.logStreamName] = false;
delete lib._postingEvents[payload.logStreamName];
cb(err)
}
})
Expand Down Expand Up @@ -151,7 +152,6 @@ lib.getToken = function(aws, groupName, streamName, retentionInDays, options, cb
}
});
};
lib._nextToken = {};

function previousKeyMapKey(group, stream) {
return group + ':' + stream;
Expand Down Expand Up @@ -229,4 +229,8 @@ lib.ignoreInProgress = function ignoreInProgress(cb) {
};
};

lib.clearSequenceToken = function clearSequenceToken(group, stream) {
delete lib._nextToken[previousKeyMapKey(group, stream)];
}

module.exports = lib;
36 changes: 32 additions & 4 deletions test/cloudwatch-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('cloudwatch-integration', function() {
}, function() {
// The second upload call should get ignored
aws.putLogEvents.calledOnce.should.equal(true);
lib._postingEvents['stream'] = false; // reset
delete lib._postingEvents['stream']; // reset
done()
});
});
Expand All @@ -54,7 +54,7 @@ describe('cloudwatch-integration', function() {
}, function() {
// The second upload call should get ignored
lib.getToken.calledOnce.should.equal(true);
lib._postingEvents['stream'] = false; // reset
delete lib._postingEvents['stream']; // reset
done()
});
});
Expand All @@ -72,8 +72,8 @@ describe('cloudwatch-integration', function() {

lib.getToken.calledTwice.should.equal(true);

lib._postingEvents['stream1'] = false; // reset
lib._postingEvents['stream2'] = false; // reset
delete lib._postingEvents['stream1']; // reset
delete lib._postingEvents['stream2']; // reset
});

it('truncates very large messages and alerts the error handler', function(done) {
Expand Down Expand Up @@ -524,4 +524,32 @@ describe('cloudwatch-integration', function() {

});

describe('clearSequenceToken', function() {
var aws = {};

beforeEach(function() {
sinon.stub(lib, 'getToken').yields(null, 'token');
});

it('clears sequence token set by upload', function(done) {
var nextSequenceToken = 'abc123';
var group = 'group';
var stream = 'stream';
aws.putLogEvents = sinon.stub().yields(null, { nextSequenceToken: nextSequenceToken });

lib.upload(aws, group, stream, Array(20), 0, {}, function() {
lib._nextToken.should.deepEqual({ 'group:stream': nextSequenceToken });
lib.clearSequenceToken(group, stream);
lib._nextToken.should.deepEqual({});
done();
});
});

afterEach(function() {
lib.getToken.restore();
});
})



});
3 changes: 2 additions & 1 deletion test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var stubbedWinston = {
upload: sinon.spy(function(aws, groupName, streamName, logEvents, retention, options, cb) {
this.lastLoggedEvents = logEvents.splice(0, 20);
cb();
})
}),
clearSequenceToken: sinon.stub()
};
var clock = sinon.useFakeTimers();

Expand Down

0 comments on commit 27c1da0

Please sign in to comment.