Skip to content

Commit

Permalink
Merge branch 'improvement/KR-268-telepat-indexed-lists' into improvem…
Browse files Browse the repository at this point in the history
…ent/KR-248-worker-rewrite

Conflicts:
	lib/Subscription.js
	lib/TelepatError.js
	utils/utils.js
  • Loading branch information
Razvan Botea committed Apr 25, 2016
2 parents 9ff442d + 758d91e commit 4c0596d
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 29 deletions.
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module.exports.ElasticSearch = require('./lib/database/elasticsearch_adapter');

module.exports.TelepatLogger = require('./lib/logger/logger');

module.exports.TelepatIndexedList = require('./lib/TelepatIndexedLists');

fs.readdirSync(__dirname+'/lib/message_queue').forEach(function(filename) {
var filenameParts = filename.split('_');

Expand Down
30 changes: 2 additions & 28 deletions lib/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Subscription.findDeviceByUdid = function(appId, udid, callback) {
* @param callback
*/
Subscription.getAllDevices = function(appId, callback) {
scanRedisKeysPattern('blg:'+appId+':devices:[^udid]*', function(err, results) {
utils.scanRedisKeysPattern('blg:'+appId+':devices:[^udid]*', Application.redisClient, function(err, results) {
if (err) return callback(err);

Application.redisClient.mget(results, function(err, results) {
Expand Down Expand Up @@ -249,7 +249,7 @@ Subscription.updateDevice = function(appId, device, props, callback) {
*/
Subscription.getSubscriptionKeysWithFilters = function(channel, callback) {
var filterChannels = [];
scanRedisKeysPattern(channel.get()+':filter:*[^:count_cache:LOCK]', function(err, results) {
utils.scanRedisKeysPattern(channel.get()+':filter:*[^:count_cache:LOCK]', Application.redisClient, function(err, results) {
if (err) return callback(err);

for(var k in results) {
Expand All @@ -265,30 +265,4 @@ Subscription.getSubscriptionKeysWithFilters = function(channel, callback) {
});
};

var scanRedisKeysPattern = function(pattern, callback) {
var redisScanCursor = -1;
var results = [];
var getDeviceIds = function(callback1) {
Application.redisClient.scan([redisScanCursor == -1 ? 0 : redisScanCursor,
'MATCH', pattern, 'COUNT', 100000], function(err, partialResults) {
if (err) return callback1(err);

redisScanCursor = partialResults[0];
results = results.concat(partialResults[1]);

callback1();
});
};

async.during(
function(callback1) {
callback1(null, redisScanCursor != 0);
},
getDeviceIds,
function(err) {
callback(err, results);
}
);
};

module.exports = Subscription;
5 changes: 5 additions & 0 deletions lib/TelepatError.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ TelepatError.errors = {
code: '048',
message: 'Failed to parse query filter: %s',
status: 400
},
TilNotFound: {
code: '049',
message: 'TelepatIndexedList with name "%s" does not exist',
status: 404
}
}

Expand Down
126 changes: 126 additions & 0 deletions lib/TelepatIndexedLists.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
var Application = require('./Application');
var TelepatError = require('./TelepatError');
var utils = require('../utils/utils');
var async = require('async');

var TelepatIndexedList = {
/**
* @callback appendCb
* @param {Error|null} [err]
*/

/**
* @callback removeCb
* @param {Error|null} [err]
* @param {Boolean} [removed]
*/

/**
* @callback getCb
* @param {Error|null} [err]
* @param {Object[]} [results]
*/

/**
*
* @param {string} listName Name of the list
* @param {string} indexedProperty The property that's being indexed by
* @param {Object} object The key of this object is the member that will be inserted with its value
* @param {appendCb} callback
*/
append: function(listName, indexedProperty, object , callback) {
var redisKey = 'blg:til:'+listName+':'+indexedProperty;
var memeberName = Object.keys(object)[0];
var memeberValue = object[memeberName];

Application.redisCacheClient.sadd([redisKey, memeberName, memeberValue], function(err) {
if (err) return callback(err);
else callback();
});
},

/**
*
* @param {string} listName Name of the list
* @param {string} indexedProperty The property that's being indexed by
* @param {string[]} members Array of memembers to check for
* @param {getCb} callback
*/
get: function(listName, indexedProperty, members, callback) {
var redisKey = 'blg:til:'+listName+':'+indexedProperty;
var tranzaction = Application.redisCacheClient.multi();

tranzaction.exists([redisKey]);

members.forEach(function(member) {
tranzaction.sismember([redisKey, member]);
});

tranzaction.exec(function(err, results) {
if (err) return callback(err);

var memebershipResults = {};

if (!results[0])
return callback(new TelepatError(TelepatError.errors.TilNotFound, [listName]));

results.splice(0, 1);

async.forEachOf(results, function(result, index, c) {
memebershipResults[members[index]] = new Boolean(result);
c();
}, function() {
callback(null, memebershipResults);
});
});
},

/**
*
* @param {string} listName Name of the list to remove
* @param {removeCb} callback
*/
removeList: function(listName, callback) {
var keyPattern = 'blg:til:'+listName+':*';
var self = this;

utils.scanRedisKeysPattern(keyPattern, Application.redisCacheClient, function(err, results) {
if (err) return callback(err);

if (!results.length)
return callback(new TelepatError(TelepatError.errors.TilNotFound, [listName]));

Application.redisCacheClient.del(results, function(err, removed) {
if (err) return callback(err);

callback(null, new Boolean(removed));
});
})
},

/**
*
* @param {string} listName Name of the list
* @param {string} indexedProperty The property that's being indexed by
* @param {string[]} members Array of memembers to remove
* @param {removeCb} callback
*/
removeMember: function(listName, indexedProperty, memeber, callback) {
var redisKey = 'blg:til:'+listName+':'+indexedProperty;
var tranzaction = Application.redisCacheClient.multi();

tranzaction.exists([redisKey]);
tranzaction.srem([redisKey, memeber]);

tranzaction.exec(function(err, replies) {
if (err) return callback(err);

if (!replies[0])
return callback(new TelepatError(TelepatError.errors.TilNotFound, [listName]));

callback(null, new Boolean(replies[1]));
});
}
};

module.exports = TelepatIndexedList;
33 changes: 32 additions & 1 deletion utils/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var lz4Module = require('lz4');
var stream = require('stream');
var async = require('async');

/**
* Transform the object that is sent in the request body in the subscribe endpoint so its compatible with
Expand Down Expand Up @@ -96,6 +97,9 @@ function testObject(object, query) {
var mainOperator = Object.keys(query)[0];
var result = null;

if (mainOperator !== 'and' && mainOperator !== 'or')
return false;

for(var operand in query[mainOperator]) {
var operator2 = Object.keys(query[mainOperator][operand])[0];

Expand Down Expand Up @@ -275,11 +279,38 @@ var lz4 = (function() {
};
})();

var scanRedisKeysPattern = function(pattern, redisInstance, callback) {
var redisScanCursor = -1;
var results = [];

var scanAndGet = function(callback1) {
redisInstance.scan([redisScanCursor == -1 ? 0 : redisScanCursor,
'MATCH', pattern, 'COUNT', 100000], function(err, partialResults) {
if (err) return callback1(err);

redisScanCursor = partialResults[0];
results = results.concat(partialResults[1]);

callback1();
});
};

async.during(
function(callback1) {
callback1(null, redisScanCursor != 0);
},
scanAndGet,
function(err) {
callback(err, results);
}
);
};
//console.log(JSON.stringify(getQueryKey(JSON.parse('{"or":[{"and":[{"is":{"gender":"male","age":23}},{"range":{"experience":{"gte":1,"lte":6}}}]},{"and":[{"like":{"image_url":"png","website":"png"}}]}]}'))));
//console.log(parseQueryObject(JSON.parse('{"or":[{"and":[{"is":{"gender":"male","age":23}},{"range":{"experience":{"gte":1,"lte":6}}}]},{"and":[{"like":{"image_url":"png","website":"png"}}]}]}')));

module.exports = {
parseQueryObject: parseQueryObject,
testObject: testObject,
lz4: lz4
lz4: lz4,
scanRedisKeysPattern: scanRedisKeysPattern
};

0 comments on commit 4c0596d

Please sign in to comment.