Skip to content

Commit

Permalink
Merge pull request #71 from Medium/nick-snappy
Browse files Browse the repository at this point in the history
upgrade packages so that zcache is node12-compatible
  • Loading branch information
nicks committed Feb 18, 2015
2 parents e40a7d2 + 411b24c commit 5e07f76
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 95 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: node_js

node_js:
- "0.10"
- "0.12"

services:
- memcached
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zcache
===

[![Build Status](https://travis-ci.org/Medium/zcache.svg?branch=xiao-travis)](https://travis-ci.org/Medium/zcache)
[![Build Status](https://travis-ci.org/Medium/zcache.svg)](https://travis-ci.org/Medium/zcache)
11 changes: 6 additions & 5 deletions externs/snappy.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

/**
* @param {string} value
* @param {function(?, ?)} callback
* @return {Buffer}
*/
function compressSync(value) {}
function compress(value, callback) {}

/**
* @param {Buffer} value
* @param {function(Buffer): string} parser
* @param {function(?, ?)} callback
* @return {string}
*/
function decompressSync(value, parser) {}
function decompress(value, parser, callback) {}

/**
* @param {Buffer} value
Expand All @@ -27,7 +29,6 @@ var parsers = {

module.exports = {
parsers: parsers,
compressSync: compressSync,
decompressSync: decompressSync
compress: compress,
decompress: decompress
}

112 changes: 61 additions & 51 deletions lib/RedisConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ function RedisConnection(host, port, options) {
this._bound_onConnect = this._onConnect.bind(this)
this._bound_onError = this._onError.bind(this)
this._bound_onEnd = this._onEnd.bind(this)
// Controls if we turn on compression or not.

// Controls if we turn on compression or not.
// All cache values which are longer than the pivot are eligible for compression
// Pivot and encoding prefix are hardcoded for now. Will revisit after
// we know we are using snappy for sure
Expand All @@ -47,42 +47,52 @@ RedisConnection.prototype.isAvailable = function () {

/** @override */
RedisConnection.prototype.set = function (key, val, maxAgeMs, setWhenNotExist) {
var deferred = Q.defer()
var params = [key, this._compress(val), 'PX', maxAgeMs]
if (setWhenNotExist) params.push('NX')
this._client.set(params, this._makeNodeResolverWithTimeout(deferred, 'set', 'Redis [set] key: ' + key))
return deferred.promise
return this._compress(val)
.thenBound(function (compressedVal) {
var params = [key, compressedVal, 'PX', maxAgeMs]
if (setWhenNotExist) params.push('NX')

var deferred = Q.defer()
this._client.set(params, this._makeNodeResolverWithTimeout(deferred, 'set', 'Redis [set] key: ' + key))
return deferred.promise
}, this)
}

/** @override */
RedisConnection.prototype.mset = function (items, maxAgeMs, setWhenNotExist) {
if (!items || !items.length) return Q.resolve(undefined)

var deferred = Q.defer()
var commands = []
var i, l
if (setWhenNotExist) {
// Use "SET" to set each key with a "NX" flag.
for (i = 0, l = items.length; i < l; i++) {
commands.push(['set', items[i].key, this._compress(items[i].value), 'PX', maxAgeMs, 'NX'])
}
} else {
// Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key
var msetCommand = ['MSET']
commands.push(msetCommand)
for (i = 0, l = items.length; i < l; i++) {
var key = items[i].key
// Append key value arguments to the set command.
msetCommand.push(key, this._compress(items[i].value))
// Append an expire command.
commands.push(['EXPIRE', key, Math.floor(maxAgeMs / 1000)])
var compressedPromises = items.map(function (item) {
return this._compress(item.value)
}, this)
return Q.all(compressedPromises)
.thenBound(function (compressedValues) {
var deferred = Q.defer()
var commands = []

var i, l
if (setWhenNotExist) {
// Use "SET" to set each key with a "NX" flag.
for (i = 0, l = items.length; i < l; i++) {
commands.push(['set', items[i].key, compressedValues[i], 'PX', maxAgeMs, 'NX'])
}
} else {
// Use "MSET" to set all the keys and "EXPIRE" to set TTL for each key
var msetCommand = ['MSET']
commands.push(msetCommand)
for (i = 0, l = items.length; i < l; i++) {
var key = items[i].key
// Append key value arguments to the set command.
msetCommand.push(key, compressedValues[i])
// Append an expire command.
commands.push(['EXPIRE', key, Math.floor(maxAgeMs / 1000)])
}
}
}
this._client.multi(commands).exec(
this._makeNodeResolverWithTimeout(deferred, 'mset',
'Redis [mset] key.0: ' + items[0].key + ' key.length: ' + items.length))

return deferred.promise
this._client.multi(commands).exec(
this._makeNodeResolverWithTimeout(deferred, 'mset',
'Redis [mset] key.0: ' + items[0].key + ' key.length: ' + items.length))
return deferred.promise
}, this)
}

/** @override */
Expand All @@ -109,7 +119,7 @@ RedisConnection.prototype.mget = function (keys) {
this._makeNodeResolverWithTimeout(deferred, 'mget',
opDesc))
return deferred.promise
.then(function (vals) {
.thenBound(function (vals) {
// This function post-processes values from Redis client to
// make cache miss result consistent with the API.
//
Expand All @@ -119,12 +129,12 @@ RedisConnection.prototype.mget = function (keys) {
if (null === vals[i]) {
vals[i] = undefined
} else {
//for real values determine if you need to decompress
vals[i] = self._decompress(vals[i])
//for real values determine if you need to uncompress
vals[i] = this._uncompress(vals[i])
}
}
return vals
})
return Q.all(vals)
}, this)
.then(this.getCountUpdater())
}

Expand Down Expand Up @@ -259,50 +269,50 @@ RedisConnection.prototype._makeNodeResolverWithTimeout = function (deferred, opN
* Private method controls how all cache values are encoded.
*
* @param {string|undefined|null} value Original cache value
* @return {string|undefined|null} Value encoded appropriately for the cache
* @return {Q.Promise.<string|undefined|null>} Value encoded appropriately for the cache
*/
RedisConnection.prototype._compress = function (value) {
if (!value || !this._compressionEnabled) {
return value
return Q.resolve(value)
}

if (value.length > this._snappyPivot) {
try {
var compressed = snappy.compressSync(value)
return this._compressedPrefix + compressed.toString('base64')
return Q.nfcall(snappy.compress, value).thenBound(function (compressed) {
return this._compressedPrefix + compressed.toString('base64')
}, this)
} catch (e) {
console.warn("Compression failed: " + e.message)
return this._uncompressedPrefix + value
return Q.resolve(this._uncompressedPrefix + value)
}
} else {
return this._uncompressedPrefix + value
return Q.resolve(this._uncompressedPrefix + value)
}
}

/**
* Private Method that knows how to parsed encoded cache value and decode.
*
* @param {string|undefined|null} value Possibly encoded value retrieved from the cache.
* @return {string|undefined|null} The original input value
* @return {Q.Promise.<string|undefined|null>} The original input value
*/
RedisConnection.prototype._decompress = function (value) {
if (!value) return value
RedisConnection.prototype._uncompress = function (value) {
if (!value) return Q.resolve(value)

// Note: always check prefixes even if compression is disabled, as there might
// be entries from prior to disabling compression
if (value.indexOf(this._compressedPrefix) === 0) {
try {
var compressedBuf = new Buffer(value.substring(this._compressedPrefix.length), 'base64')
var orig = snappy.decompressSync(compressedBuf, snappy.parsers.string)
return orig
return Q.nfcall(snappy.uncompress, compressedBuf, {asBuffer: false})
} catch (e) {
console.warn("Decompression failed: " + e.message)
return undefined
}
return Q.resolve(undefined)
}
} else if (value.indexOf(this._uncompressedPrefix) === 0) {
return value.substring(this._uncompressedPrefix.length)
return Q.resolve(value.substring(this._uncompressedPrefix.length))
} else {
return value
return Q.resolve(value)
}
}

Expand Down
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "zcache",
"description": "AWS zone-aware multi-layer cache",
"version": "0.4.3-alpha",
"version": "0.5.0",
"homepage": "https://github.com/Medium/zcache",
"authors": [
"Jeremy Stanley <github@azulus.com> (https://github.com/azulus)",
Expand All @@ -17,16 +17,16 @@
"dependencies": {
"node-memcache-parser-obvfork": "0.1.1",
"generic-pool": "2.0.3",
"kew": "0.5.0-alpha",
"redis": "0.8.2",
"hiredis": "0.1.16",
"kew": "0.5.0",
"redis": "0.12.1",
"hiredis": "0.2.0",
"metrics": "0.1.6",
"hashring": "1.0.3",
"snappy": "2.1.2"
"hashring": "3.1.0",
"snappy": "3.0.7"
},
"devDependencies": {
"nodeunit": "0.7.4",
"nodeunitq": "0.0.3",
"nodeunit": "0.9.0",
"nodeunitq": "0.1.1",
"logg": "0.2.2",
"closure-npc": "0.1.3",
"sinon": "git://github.com/Medium/Sinon.JS.git#xiao-fix-clearTimeout-for-nodejs"
Expand Down
8 changes: 4 additions & 4 deletions test/test_CacheCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ builder.add(function testLatencyMeasurement(test) {
.then(function() {
test.equal(20, cluster.getStats('set').count())
test.ok(cluster.getStats('set').mean() > 28)
test.ok(cluster.getStats('set').mean() < 35)
test.ok(cluster.getStats('set').mean() < 38)

var getPromises = []
for (var i = 0; i < 20; i++) {
Expand All @@ -352,7 +352,7 @@ builder.add(function testLatencyMeasurement(test) {
.then(function() {
test.equal(20, cluster.getStats('get').count())
test.ok(cluster.getStats('get').mean() > 28)
test.ok(cluster.getStats('get').mean() < 35)
test.ok(cluster.getStats('get').mean() < 38)

var items = []
for (var i = 0; i < 20; i++) {
Expand All @@ -366,7 +366,7 @@ builder.add(function testLatencyMeasurement(test) {
.then(function() {
test.equal(1, cluster.getStats('mset').count())
test.ok(cluster.getStats('mset').mean() > 28)
test.ok(cluster.getStats('mset').mean() < 35)
test.ok(cluster.getStats('mset').mean() < 38)

var keys = []
for (var i = 0; i < 20; i++) {
Expand All @@ -377,7 +377,7 @@ builder.add(function testLatencyMeasurement(test) {
.then(function() {
test.equal(1, cluster.getStats('mget').count())
test.ok(cluster.getStats('mget').mean() > 28)
test.ok(cluster.getStats('mget').mean() < 35)
test.ok(cluster.getStats('mget').mean() < 38)
})

})
Expand Down
Loading

0 comments on commit 5e07f76

Please sign in to comment.