diff --git a/lib/Model.js b/lib/Model.js index 01c0a4b..0d77dc3 100644 --- a/lib/Model.js +++ b/lib/Model.js @@ -1,4 +1,6 @@ var Application = require('./Application'); +var User = require('./User'); +var Context = require('./Context'); var TelepatError = require('./TelepatError'); var async = require('async'); var utils = require('../utils/utils'); @@ -20,14 +22,25 @@ function Model(_id, callback) { Model.delete = function(objects, callback) { var childrenFilter = new FilterBuilder('or'); + var appModels = {}; + async.series([ function(callback1) { - Application.datasource.dataStorage.deleteObjects(objects, function(errs) { + async.forEachOfSeries(objects, function(modelName, id, c) { + if (modelName == 'context') + Context.delete(id, function() {}); + else + appModels[id] = modelName; + c(); + }, callback1); + }, + function(callback1) { + Application.datasource.dataStorage.deleteObjects(appModels, function(errs) { if (errs && errs.length >= 1) { async.each(errs, function(error, c) { if (error.status == 404) { Application.logger.notice('Model "'+error.args[0]+'" with ID "'+error.args[1]+'" not found.'); - delete objects[error.args[1]]; + delete appModels[error.args[1]]; c(); } else { c(error); @@ -38,8 +51,8 @@ Model.delete = function(objects, callback) { }); }, function(callback1) { - async.each(Object.keys(objects), function(id, c) { - var modelName = objects[id]; + async.each(Object.keys(appModels), function(id, c) { + var modelName = appModels[id]; var filterObj = {}; filterObj[modelName+'_id'] = id; @@ -201,10 +214,23 @@ Model.create = function(deltas, callback, returnedObjectsCb) { dbItems.push(d.object); }); - if (dbItems.length > 0) - Application.datasource.dataStorage.createObjects(dbItems, callback1); - else + var appModelsObjects = []; + + async.eachSeries(dbItems, function(item, c) { + if (item.type == 'user') { + User.create(item, item.application_id, function(){}); + } + else if (item.type == 'context') + Context.create(item, function(){}); + else + appModelsObjects.push(item); + c(); + }, function() { + if (appModelsObjects.length > 0) + Application.datasource.dataStorage.createObjects(appModelsObjects, function(errs, result) {}); + callback1(); + }); } ], callback); }; @@ -217,16 +243,28 @@ Model.create = function(deltas, callback, returnedObjectsCb) { * @param cb */ Model.update = function(patches, callback) { - Application.datasource.dataStorage.updateObjects(patches, function(errs) { - if (errs && errs.length >= 1) {} - errs.forEach(function(error) { - if (error.status == 404) - Application.logger.notice(error.message); - else - Application.logger.error(error.toString()); + var appModelsPatches = []; + + async.eachSeries(patches, function(p, c) { + if (p.path.split('/')[0] == 'user') + User.update(p, function(){}); + else if (p.path.split('/')[0] == 'context') + Context.update(p, function() {}); + else + appModelsPatches.push(p); + c(); + }, function() { + Application.datasource.dataStorage.updateObjects(appModelsPatches, function(errs) { + if (errs && errs.length >= 1) {} + errs.forEach(function(error) { + if (error.status == 404) + Application.logger.notice(error.message); + else + Application.logger.error(error.toString()); + }); }); + callback(); }); - callback(); }; Model.getFilterFromChannel = function(channel) { @@ -248,17 +286,18 @@ Model.getFilterFromChannel = function(channel) { } if (channel.filter) { - searchFilters.and(); (function AddFilters(filterObject) { var filterKey = Object.keys(filterObject); - searchFilters[filterKey](); + if (filterKey == 'or') + searchFilters.or(); filterObject[filterKey].forEach(function(filters, key) { if (key == 'and' || key == 'or') AddFilters(filterObject[filterKey]); else { + console.log(filters); for(var key2 in filters) { - searchFilters.addFilter(key2, filterObject[filterKey][filters[key2]]); + searchFilters.addFilter(key2, filters[key2]); } } }); diff --git a/lib/Subscription.js b/lib/Subscription.js index f1cedd2..aca20f8 100644 --- a/lib/Subscription.js +++ b/lib/Subscription.js @@ -209,17 +209,17 @@ Subscription.getAllDevices = function(appId, callback) { devices[parsedDevice.volatile.server_name].push(parsedDevice.volatile.token); } else if(parsedDevice.persistent) { - var queueName = parsedDevice.persistent[parsedDevice.persistent.type+'_transport']; + var queueName = parsedDevice.persistent.type+'_transport'; if (!devices[queueName]) - devices[queueName] = [parsedDevice.volatile.token]; + devices[queueName] = [parsedDevice.persistent.token]; else - devices[queueName].push(parsedDevice.volatile.token); + devices[queueName].push(parsedDevice.persistent.token); } } c(); }, function() { - callback(devices); + callback(null, devices); }); }); }); diff --git a/lib/User.js b/lib/User.js index 64f3e87..6b43438 100644 --- a/lib/User.js +++ b/lib/User.js @@ -47,7 +47,7 @@ User.load = function() { */ User.create = function(props, appId, callback) { var self = this; - props.id = guid.v4(); + props.id = props.id || guid.v4(); props.application_id = appId; props.created = Math.floor((new Date()).getTime()/1000); props.modified = props.created; @@ -66,10 +66,10 @@ User.create = function(props, appId, callback) { type: 'user_metadata' }; - User(props.username, appId, function(err, result) { + User({username: props.username}, appId, function(err, result) { if (err && err.status == 404) { Application.datasource.dataStorage.createObjects([props, userMetadata], function(errs) { - if (errs.length) { + if (errs && errs.length) { errs.forEach(function(error) { Application.logger.error(error.message); }); @@ -147,7 +147,9 @@ User.delete = function(id, appId, callback) { callback1(); }, function(callback1) { - Application.datasource.dataStorage.deleteObjects([id], function(errs) { + var usrObj = {}; + usrObj[id] = 'user'; + Application.datasource.dataStorage.deleteObjects(usrObj, function(errs) { callback1(errs && errs.length > 1 ? errs[0] : null); }); }, diff --git a/lib/database/elasticsearch_adapter.js b/lib/database/elasticsearch_adapter.js index 4e6cc83..9c5c82e 100644 --- a/lib/database/elasticsearch_adapter.js +++ b/lib/database/elasticsearch_adapter.js @@ -245,17 +245,17 @@ ElasticSearchDB.prototype.searchObjects = function(options, callback) { } }); } else { - if (sort) { + if (options.sort) { reqBody.sort = {}; - if (!sort.type) { - reqBody.sort = [sort]; - } else if (sort.type == 'default') { - reqBody.sort[sort.field] = sort.order; - } else if (sort.type == 'geo') { + if (!options.sort.type) { + reqBody.sort = [options.sort]; + } else if (options.sort.type == 'default') { + reqBody.sort[options.sort.field] = options.sort.order; + } else if (options.sort.type == 'geo') { reqBody.sort._geo_distance = {}; - reqBody.sort._geo_distance[sort.field] = {lat: sort.poi.lat || 0.0, lon: sort.poi.long || 0.0}; - reqBody.sort._geo_distance.order = sort.order; + reqBody.sort._geo_distance[options.sort.field] = {lat: options.sort.poi.lat || 0.0, lon: options.sort.poi.long || 0.0}; + reqBody.sort._geo_distance.order = options.sort.order; } } @@ -264,8 +264,7 @@ ElasticSearchDB.prototype.searchObjects = function(options, callback) { type: options.modelName, body: reqBody, from: options.offset, - size: options.limit, - search_type: options.sort + size: options.limit }, function(err, results) { if (err) return callback(err); @@ -343,7 +342,15 @@ ElasticSearchDB.prototype.createObjects = function(objects, callback) { index: this.config.index, body: bulk, refresh: builtinDetected - }, callback); + }, function(err, res) { + if (res.errors) { + res.items.forEach(function(error) { + Application.logger.error('Error creating '+error.index._type+': '+error.index.error); + }); + } + + callback(err, res); + }); }; ElasticSearchDB.prototype.updateObjects = function(patches, callback) { @@ -391,7 +398,7 @@ ElasticSearchDB.prototype.updateObjects = function(patches, callback) { 'def parsed = jsonSlurper.parseText(\''+JSON.stringify(dbObjects[id])+'\');'+ 'ctx._source = parsed;'; - bulk.push({update: {_type: objectModel, _id: id}}); + bulk.push({update: {_type: objectModel, _id: id, _retry_on_conflict: 10}}); bulk.push({script: script}); c(); }, function() {