Skip to content

Commit

Permalink
Merge branch 'improvement/KR-248-worker-rewrite' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Razvan Botea committed Apr 28, 2016
2 parents 1d3591e + f18ad89 commit 1f76a77
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 38 deletions.
75 changes: 57 additions & 18 deletions lib/Model.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
var Application = require('./Application');
var User = require('./User');
var Context = require('./Context');
var TelepatError = require('./TelepatError');
var async = require('async');
var utils = require('../utils/utils');
Expand All @@ -20,14 +22,25 @@ function Model(_id, callback) {
Model.delete = function(objects, callback) {
var childrenFilter = new FilterBuilder('or');

var appModels = {};

async.series([
function(callback1) {
Application.datasource.dataStorage.deleteObjects(objects, function(errs) {
async.forEachOfSeries(objects, function(modelName, id, c) {
if (modelName == 'context')
Context.delete(id, function() {});
else
appModels[id] = modelName;
c();
}, callback1);
},
function(callback1) {
Application.datasource.dataStorage.deleteObjects(appModels, function(errs) {
if (errs && errs.length >= 1) {
async.each(errs, function(error, c) {
if (error.status == 404) {
Application.logger.notice('Model "'+error.args[0]+'" with ID "'+error.args[1]+'" not found.');
delete objects[error.args[1]];
delete appModels[error.args[1]];
c();
} else {
c(error);
Expand All @@ -38,8 +51,8 @@ Model.delete = function(objects, callback) {
});
},
function(callback1) {
async.each(Object.keys(objects), function(id, c) {
var modelName = objects[id];
async.each(Object.keys(appModels), function(id, c) {
var modelName = appModels[id];
var filterObj = {};

filterObj[modelName+'_id'] = id;
Expand Down Expand Up @@ -201,10 +214,23 @@ Model.create = function(deltas, callback, returnedObjectsCb) {
dbItems.push(d.object);
});

if (dbItems.length > 0)
Application.datasource.dataStorage.createObjects(dbItems, callback1);
else
var appModelsObjects = [];

async.eachSeries(dbItems, function(item, c) {
if (item.type == 'user') {
User.create(item, item.application_id, function(){});
}
else if (item.type == 'context')
Context.create(item, function(){});
else
appModelsObjects.push(item);
c();
}, function() {
if (appModelsObjects.length > 0)
Application.datasource.dataStorage.createObjects(appModelsObjects, function(errs, result) {});

callback1();
});
}
], callback);
};
Expand All @@ -217,16 +243,28 @@ Model.create = function(deltas, callback, returnedObjectsCb) {
* @param cb
*/
Model.update = function(patches, callback) {
Application.datasource.dataStorage.updateObjects(patches, function(errs) {
if (errs && errs.length >= 1) {}
errs.forEach(function(error) {
if (error.status == 404)
Application.logger.notice(error.message);
else
Application.logger.error(error.toString());
var appModelsPatches = [];

async.eachSeries(patches, function(p, c) {
if (p.path.split('/')[0] == 'user')
User.update(p, function(){});
else if (p.path.split('/')[0] == 'context')
Context.update(p, function() {});
else
appModelsPatches.push(p);
c();
}, function() {
Application.datasource.dataStorage.updateObjects(appModelsPatches, function(errs) {
if (errs && errs.length >= 1) {}
errs.forEach(function(error) {
if (error.status == 404)
Application.logger.notice(error.message);
else
Application.logger.error(error.toString());
});
});
callback();
});
callback();
};

Model.getFilterFromChannel = function(channel) {
Expand All @@ -248,17 +286,18 @@ Model.getFilterFromChannel = function(channel) {
}

if (channel.filter) {
searchFilters.and();
(function AddFilters(filterObject) {
var filterKey = Object.keys(filterObject);
searchFilters[filterKey]();
if (filterKey == 'or')
searchFilters.or();

filterObject[filterKey].forEach(function(filters, key) {
if (key == 'and' || key == 'or')
AddFilters(filterObject[filterKey]);
else {
console.log(filters);
for(var key2 in filters) {
searchFilters.addFilter(key2, filterObject[filterKey][filters[key2]]);
searchFilters.addFilter(key2, filters[key2]);
}
}
});
Expand Down
8 changes: 4 additions & 4 deletions lib/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,17 @@ Subscription.getAllDevices = function(appId, callback) {
devices[parsedDevice.volatile.server_name].push(parsedDevice.volatile.token);

} else if(parsedDevice.persistent) {
var queueName = parsedDevice.persistent[parsedDevice.persistent.type+'_transport'];
var queueName = parsedDevice.persistent.type+'_transport';

if (!devices[queueName])
devices[queueName] = [parsedDevice.volatile.token];
devices[queueName] = [parsedDevice.persistent.token];
else
devices[queueName].push(parsedDevice.volatile.token);
devices[queueName].push(parsedDevice.persistent.token);
}
}
c();
}, function() {
callback(devices);
callback(null, devices);
});
});
});
Expand Down
10 changes: 6 additions & 4 deletions lib/User.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ User.load = function() {
*/
User.create = function(props, appId, callback) {
var self = this;
props.id = guid.v4();
props.id = props.id || guid.v4();
props.application_id = appId;
props.created = Math.floor((new Date()).getTime()/1000);
props.modified = props.created;
Expand All @@ -66,10 +66,10 @@ User.create = function(props, appId, callback) {
type: 'user_metadata'
};

User(props.username, appId, function(err, result) {
User({username: props.username}, appId, function(err, result) {
if (err && err.status == 404) {
Application.datasource.dataStorage.createObjects([props, userMetadata], function(errs) {
if (errs.length) {
if (errs && errs.length) {
errs.forEach(function(error) {
Application.logger.error(error.message);
});
Expand Down Expand Up @@ -147,7 +147,9 @@ User.delete = function(id, appId, callback) {
callback1();
},
function(callback1) {
Application.datasource.dataStorage.deleteObjects([id], function(errs) {
var usrObj = {};
usrObj[id] = 'user';
Application.datasource.dataStorage.deleteObjects(usrObj, function(errs) {
callback1(errs && errs.length > 1 ? errs[0] : null);
});
},
Expand Down
31 changes: 19 additions & 12 deletions lib/database/elasticsearch_adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,17 @@ ElasticSearchDB.prototype.searchObjects = function(options, callback) {
}
});
} else {
if (sort) {
if (options.sort) {
reqBody.sort = {};

if (!sort.type) {
reqBody.sort = [sort];
} else if (sort.type == 'default') {
reqBody.sort[sort.field] = sort.order;
} else if (sort.type == 'geo') {
if (!options.sort.type) {
reqBody.sort = [options.sort];
} else if (options.sort.type == 'default') {
reqBody.sort[options.sort.field] = options.sort.order;
} else if (options.sort.type == 'geo') {
reqBody.sort._geo_distance = {};
reqBody.sort._geo_distance[sort.field] = {lat: sort.poi.lat || 0.0, lon: sort.poi.long || 0.0};
reqBody.sort._geo_distance.order = sort.order;
reqBody.sort._geo_distance[options.sort.field] = {lat: options.sort.poi.lat || 0.0, lon: options.sort.poi.long || 0.0};
reqBody.sort._geo_distance.order = options.sort.order;
}
}

Expand All @@ -264,8 +264,7 @@ ElasticSearchDB.prototype.searchObjects = function(options, callback) {
type: options.modelName,
body: reqBody,
from: options.offset,
size: options.limit,
search_type: options.sort
size: options.limit
}, function(err, results) {
if (err) return callback(err);

Expand Down Expand Up @@ -343,7 +342,15 @@ ElasticSearchDB.prototype.createObjects = function(objects, callback) {
index: this.config.index,
body: bulk,
refresh: builtinDetected
}, callback);
}, function(err, res) {
if (res.errors) {
res.items.forEach(function(error) {
Application.logger.error('Error creating '+error.index._type+': '+error.index.error);
});
}

callback(err, res);
});
};

ElasticSearchDB.prototype.updateObjects = function(patches, callback) {
Expand Down Expand Up @@ -391,7 +398,7 @@ ElasticSearchDB.prototype.updateObjects = function(patches, callback) {
'def parsed = jsonSlurper.parseText(\''+JSON.stringify(dbObjects[id])+'\');'+
'ctx._source = parsed;';

bulk.push({update: {_type: objectModel, _id: id}});
bulk.push({update: {_type: objectModel, _id: id, _retry_on_conflict: 10}});
bulk.push({script: script});
c();
}, function() {
Expand Down

0 comments on commit 1f76a77

Please sign in to comment.