From de87d033b724f3ae8c3f00229a0f6bf411223748 Mon Sep 17 00:00:00 2001 From: Nick Santos Date: Thu, 12 Feb 2015 11:54:36 -0500 Subject: [PATCH 1/3] upgrade snappy. The new version of snappy has a lot of API changes and is async-only. --- README.md | 2 +- externs/snappy.js | 11 ++-- lib/RedisConnection.js | 112 +++++++++++++++++++---------------- package.json | 6 +- test/test_CacheCluster.js | 8 +-- test/test_RedisConnection.js | 50 ++++++++-------- 6 files changed, 99 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index ba41322..47635fd 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ zcache === -[![Build Status](https://travis-ci.org/Medium/zcache.svg?branch=xiao-travis)](https://travis-ci.org/Medium/zcache) +[![Build Status](https://travis-ci.org/Medium/zcache.svg)](https://travis-ci.org/Medium/zcache) diff --git a/externs/snappy.js b/externs/snappy.js index 49fa935..7c0f074 100644 --- a/externs/snappy.js +++ b/externs/snappy.js @@ -2,16 +2,18 @@ /** * @param {string} value + * @param {function(?, ?)} callback * @return {Buffer} */ -function compressSync(value) {} +function compress(value, callback) {} /** * @param {Buffer} value * @param {function(Buffer): string} parser + * @param {function(?, ?)} callback * @return {string} */ -function decompressSync(value, parser) {} +function decompress(value, parser, callback) {} /** * @param {Buffer} value @@ -27,7 +29,6 @@ var parsers = { module.exports = { parsers: parsers, - compressSync: compressSync, - decompressSync: decompressSync + compress: compress, + decompress: decompress } - diff --git a/lib/RedisConnection.js b/lib/RedisConnection.js index b5e4101..aac722f 100644 --- a/lib/RedisConnection.js +++ b/lib/RedisConnection.js @@ -28,8 +28,8 @@ function RedisConnection(host, port, options) { this._bound_onConnect = this._onConnect.bind(this) this._bound_onError = this._onError.bind(this) this._bound_onEnd = this._onEnd.bind(this) - - // Controls if we turn on compression or not. + + // Controls if we turn on compression or not. // All cache values which are longer than the pivot are eligible for compression // Pivot and encoding prefix are hardcoded for now. Will revisit after // we know we are using snappy for sure @@ -47,42 +47,52 @@ RedisConnection.prototype.isAvailable = function () { /** @override */ RedisConnection.prototype.set = function (key, val, maxAgeMs, setWhenNotExist) { - var deferred = Q.defer() - var params = [key, this._compress(val), 'PX', maxAgeMs] - if (setWhenNotExist) params.push('NX') - this._client.set(params, this._makeNodeResolverWithTimeout(deferred, 'set', 'Redis [set] key: ' + key)) - return deferred.promise + return this._compress(val) + .thenBound(function (compressedVal) { + var params = [key, compressedVal, 'PX', maxAgeMs] + if (setWhenNotExist) params.push('NX') + + var deferred = Q.defer() + this._client.set(params, this._makeNodeResolverWithTimeout(deferred, 'set', 'Redis [set] key: ' + key)) + return deferred.promise + }, this) } /** @override */ RedisConnection.prototype.mset = function (items, maxAgeMs, setWhenNotExist) { if (!items || !items.length) return Q.resolve(undefined) - var deferred = Q.defer() - var commands = [] - var i, l - if (setWhenNotExist) { - // Use "SET" to set each key with a "NX" flag. - for (i = 0, l = items.length; i < l; i++) { - commands.push(['set', items[i].key, this._compress(items[i].value), 'PX', maxAgeMs, 'NX']) - } - } else { - // Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key - var msetCommand = ['MSET'] - commands.push(msetCommand) - for (i = 0, l = items.length; i < l; i++) { - var key = items[i].key - // Append key value arguments to the set command. - msetCommand.push(key, this._compress(items[i].value)) - // Append an expire command. - commands.push(['EXPIRE', key, Math.floor(maxAgeMs / 1000)]) + var compressedPromises = items.map(function (item) { + return this._compress(item.value) + }, this) + return Q.all(compressedPromises) + .thenBound(function (compressedValues) { + var deferred = Q.defer() + var commands = [] + + var i, l + if (setWhenNotExist) { + // Use "SET" to set each key with a "NX" flag. + for (i = 0, l = items.length; i < l; i++) { + commands.push(['set', items[i].key, compressedValues[i], 'PX', maxAgeMs, 'NX']) + } + } else { + // Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key + var msetCommand = ['MSET'] + commands.push(msetCommand) + for (i = 0, l = items.length; i < l; i++) { + var key = items[i].key + // Append key value arguments to the set command. + msetCommand.push(key, compressedValues[i]) + // Append an expire command. + commands.push(['EXPIRE', key, Math.floor(maxAgeMs / 1000)]) + } } - } - this._client.multi(commands).exec( - this._makeNodeResolverWithTimeout(deferred, 'mset', - 'Redis [mset] key.0: ' + items[0].key + ' key.length: ' + items.length)) - - return deferred.promise + this._client.multi(commands).exec( + this._makeNodeResolverWithTimeout(deferred, 'mset', + 'Redis [mset] key.0: ' + items[0].key + ' key.length: ' + items.length)) + return deferred.promise + }, this) } /** @override */ @@ -109,7 +119,7 @@ RedisConnection.prototype.mget = function (keys) { this._makeNodeResolverWithTimeout(deferred, 'mget', opDesc)) return deferred.promise - .then(function (vals) { + .thenBound(function (vals) { // This function post-processes values from Redis client to // make cache miss result consistent with the API. // @@ -119,12 +129,12 @@ RedisConnection.prototype.mget = function (keys) { if (null === vals[i]) { vals[i] = undefined } else { - //for real values determine if you need to decompress - vals[i] = self._decompress(vals[i]) + //for real values determine if you need to uncompress + vals[i] = this._uncompress(vals[i]) } } - return vals - }) + return Q.all(vals) + }, this) .then(this.getCountUpdater()) } @@ -259,23 +269,24 @@ RedisConnection.prototype._makeNodeResolverWithTimeout = function (deferred, opN * Private method controls how all cache values are encoded. * * @param {string|undefined|null} value Original cache value - * @return {string|undefined|null} Value encoded appropriately for the cache + * @return {Q.Promise.} Value encoded appropriately for the cache */ RedisConnection.prototype._compress = function (value) { if (!value || !this._compressionEnabled) { - return value + return Q.resolve(value) } if (value.length > this._snappyPivot) { try { - var compressed = snappy.compressSync(value) - return this._compressedPrefix + compressed.toString('base64') + return Q.nfcall(snappy.compress, value).thenBound(function (compressed) { + return this._compressedPrefix + compressed.toString('base64') + }, this) } catch (e) { console.warn("Compression failed: " + e.message) - return this._uncompressedPrefix + value + return Q.resolve(this._uncompressedPrefix + value) } } else { - return this._uncompressedPrefix + value + return Q.resolve(this._uncompressedPrefix + value) } } @@ -283,26 +294,25 @@ RedisConnection.prototype._compress = function (value) { * Private Method that knows how to parsed encoded cache value and decode. * * @param {string|undefined|null} value Possibly encoded value retrieved from the cache. - * @return {string|undefined|null} The original input value + * @return {Q.Promise.} The original input value */ -RedisConnection.prototype._decompress = function (value) { - if (!value) return value +RedisConnection.prototype._uncompress = function (value) { + if (!value) return Q.resolve(value) // Note: always check prefixes even if compression is disabled, as there might // be entries from prior to disabling compression if (value.indexOf(this._compressedPrefix) === 0) { try { var compressedBuf = new Buffer(value.substring(this._compressedPrefix.length), 'base64') - var orig = snappy.decompressSync(compressedBuf, snappy.parsers.string) - return orig + return Q.nfcall(snappy.uncompress, compressedBuf, {asBuffer: false}) } catch (e) { console.warn("Decompression failed: " + e.message) - return undefined - } + return Q.resolve(undefined) + } } else if (value.indexOf(this._uncompressedPrefix) === 0) { - return value.substring(this._uncompressedPrefix.length) + return Q.resolve(value.substring(this._uncompressedPrefix.length)) } else { - return value + return Q.resolve(value) } } diff --git a/package.json b/package.json index c2b97c0..21541e7 100644 --- a/package.json +++ b/package.json @@ -22,11 +22,11 @@ "hiredis": "0.1.16", "metrics": "0.1.6", "hashring": "1.0.3", - "snappy": "2.1.2" + "snappy": "3.0.7" }, "devDependencies": { - "nodeunit": "0.7.4", - "nodeunitq": "0.0.3", + "nodeunit": "0.9.0", + "nodeunitq": "0.1.1", "logg": "0.2.2", "closure-npc": "0.1.3", "sinon": "git://github.com/Medium/Sinon.JS.git#xiao-fix-clearTimeout-for-nodejs" diff --git a/test/test_CacheCluster.js b/test/test_CacheCluster.js index d92471d..423f725 100644 --- a/test/test_CacheCluster.js +++ b/test/test_CacheCluster.js @@ -341,7 +341,7 @@ builder.add(function testLatencyMeasurement(test) { .then(function() { test.equal(20, cluster.getStats('set').count()) test.ok(cluster.getStats('set').mean() > 28) - test.ok(cluster.getStats('set').mean() < 35) + test.ok(cluster.getStats('set').mean() < 38) var getPromises = [] for (var i = 0; i < 20; i++) { @@ -352,7 +352,7 @@ builder.add(function testLatencyMeasurement(test) { .then(function() { test.equal(20, cluster.getStats('get').count()) test.ok(cluster.getStats('get').mean() > 28) - test.ok(cluster.getStats('get').mean() < 35) + test.ok(cluster.getStats('get').mean() < 38) var items = [] for (var i = 0; i < 20; i++) { @@ -366,7 +366,7 @@ builder.add(function testLatencyMeasurement(test) { .then(function() { test.equal(1, cluster.getStats('mset').count()) test.ok(cluster.getStats('mset').mean() > 28) - test.ok(cluster.getStats('mset').mean() < 35) + test.ok(cluster.getStats('mset').mean() < 38) var keys = [] for (var i = 0; i < 20; i++) { @@ -377,7 +377,7 @@ builder.add(function testLatencyMeasurement(test) { .then(function() { test.equal(1, cluster.getStats('mget').count()) test.ok(cluster.getStats('mget').mean() > 28) - test.ok(cluster.getStats('mget').mean() < 35) + test.ok(cluster.getStats('mget').mean() < 38) }) }) diff --git a/test/test_RedisConnection.js b/test/test_RedisConnection.js index 694c647..40a7e44 100644 --- a/test/test_RedisConnection.js +++ b/test/test_RedisConnection.js @@ -296,7 +296,7 @@ builder.add(function testCounts(test) { Q.all([cacheInstance.del('key1'), cacheInstance.del('key2'), cacheInstance.del('key3'), cacheInstance.del('key4')]) .then(function () { - cacheInstance.mset(items, 300000) + return cacheInstance.mset(items, 300000) }) .then(function () { return cacheInstance.mget(['key1', 'key2', 'key3', 'key4']) @@ -309,7 +309,7 @@ builder.add(function testCounts(test) { }) .fail(function (e) { console.error(e) - test.fail(e.message) + test.ok(false, e.message) test.done() }) }) @@ -328,32 +328,32 @@ function runCommonTest(cacheInstancePut, cacheInstanceGet, test, compressionFlag // longVal will have a length of 1180 after the for loop is executed, making it longer than the pivot var longVal = 'A long string that should be compressed because it is greater than 750 chars' - for (i = 0; i < 4; i++) { - longVal = longVal.concat(longVal) + for (i = 0; i < 4; i++) { + longVal = longVal.concat(longVal) } var longValOn = '@snappy@wAnwPEEgbG9uZyBzdHJpbmcgdGhhdCBzaG91bGQgYmUgY29tcHJlc3NlZCBiZWNhdXNlIGl0IGlzIGdyZWF0ZXIBMChuIDc1MCBjaGFyc/5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAP5MAM5MAA==' var tinyVal = 'tiny' var tinyValOn = '@orig@tiny' var nullVal = 'null' var undefinedVal = 'undefined' - + var items = [ {key: 'longValue2', value: longVal}, {key: 'tinyValue2', value: tinyVal} ] - + var testKeys = ['longValue', 'tinyValue', 'longValue2', 'tinyValue2', 'nullValue', 'undefinedValue'] var expValsWithoutCompression = [longVal, tinyVal, longVal, tinyVal, nullVal, undefinedVal] var expValsWithCompression = [longValOn, tinyValOn, longValOn, tinyValOn, nullVal, undefinedVal] var expVals = compressionFlag ? expValsWithCompression : expValsWithoutCompression var populateSetterFuncs = function() { - return [ + return [ cacheInstancePut.set('longValue', longVal, 100000), cacheInstancePut.set('tinyValue', tinyVal, 100000), cacheInstancePut.set('nullValue', null, 100000), cacheInstancePut.set('undefinedValue', undefined, 100000), - cacheInstancePut.mset(items, 100000) + cacheInstancePut.mset(items, 100000) ] } @@ -375,28 +375,28 @@ function runCommonTest(cacheInstancePut, cacheInstanceGet, test, compressionFlag return deferred.resolve(value) }) return deferred.promise - + }).then(function (vals) { // confirm cache entries look good test.deepEqual(expVals, vals) - return Q.resolve() + return Q.resolve() }).then(function () { return cacheInstanceGet.get('longValue') }).then(function (val) { - // confirm get works - test.equal(longVal, val) + // confirm get works + test.equal(longVal, val) return cacheInstanceGet.mget(testKeys) }).then(function (vals) { // confirm mget works test.deepEqual(expValsWithoutCompression, vals) destroyRedisClient() - }) + }) .fail(function (e) { console.error(e) - test.fail(e.message) + test.ok(false, e.stack) destroyRedisClient() - }) + }) - return bigDeferred.promise + return bigDeferred.promise } // Test 1: Compression Off @@ -409,7 +409,7 @@ builder.add(function testCompressionOff(test) { runCommonTest(cacheInstance, cacheInstance, test, false) .fin(function () { cacheInstance.destroy() - }) + }) }) cacheInstance.on('destroy', function () { @@ -457,12 +457,12 @@ builder.add(function testCompressionPutOffGetOn(test) { .fin(function () { cacheInstancePut.destroy() cacheInstanceGet.destroy() - }) + }) }) var count = 0 - var destroy = function () { - if (++count === 2) test.done() + var destroy = function () { + if (++count === 2) test.done() } cacheInstancePut.on('destroy', function() { @@ -492,13 +492,13 @@ builder.add(function testCompressionPutOnGetOff(test) { runCommonTest(cacheInstancePut, cacheInstanceGet, test, true) .fin(function () { cacheInstancePut.destroy() - cacheInstanceGet.destroy() + cacheInstanceGet.destroy() }) }) - + var count = 0 - var destroy = function () { - if (++count === 2) test.done() + var destroy = function () { + if (++count === 2) test.done() } cacheInstancePut.on('destroy', function() { @@ -510,5 +510,3 @@ builder.add(function testCompressionPutOnGetOff(test) { cacheInstancePut.connect() }) - - From 9de3adc3637c88d486aa2e278c515bc10e3176e6 Mon Sep 17 00:00:00 2001 From: Nick Santos Date: Thu, 12 Feb 2015 12:16:47 -0500 Subject: [PATCH 2/3] upgrade packages so that zcache is node12-compatible --- .travis.yml | 1 + package.json | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index f3b735b..cc58c23 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,7 @@ language: node_js node_js: - "0.10" + - "0.12" services: - memcached diff --git a/package.json b/package.json index 21541e7..7b9f1f2 100644 --- a/package.json +++ b/package.json @@ -18,10 +18,10 @@ "node-memcache-parser-obvfork": "0.1.1", "generic-pool": "2.0.3", "kew": "0.5.0-alpha", - "redis": "0.8.2", - "hiredis": "0.1.16", + "redis": "0.12.1", + "hiredis": "0.2.0", "metrics": "0.1.6", - "hashring": "1.0.3", + "hashring": "3.1.0", "snappy": "3.0.7" }, "devDependencies": { From 411b24cd8954dbee5fbce45bcbea4835e109816d Mon Sep 17 00:00:00 2001 From: Nick Santos Date: Wed, 18 Feb 2015 17:13:06 -0500 Subject: [PATCH 3/3] bump package --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 7b9f1f2..1c41e31 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "zcache", "description": "AWS zone-aware multi-layer cache", - "version": "0.4.3-alpha", + "version": "0.5.0", "homepage": "https://github.com/Medium/zcache", "authors": [ "Jeremy Stanley (https://github.com/azulus)", @@ -17,7 +17,7 @@ "dependencies": { "node-memcache-parser-obvfork": "0.1.1", "generic-pool": "2.0.3", - "kew": "0.5.0-alpha", + "kew": "0.5.0", "redis": "0.12.1", "hiredis": "0.2.0", "metrics": "0.1.6",