-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
183 lines (145 loc) · 4.13 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import AWS from 'aws-sdk';
import _ from 'lodash';
import config from 'config';
import moment from 'moment';
import mongodb from 'mongodb';
import uuid from './lib/util/uuid.js';
const sliceSize = 1000;
const AWSAccessKeyId = config.aws.credentials.AWSAccessKeyId;
const AWSSecretKey = config.aws.credentials.AWSSecretKey;
const AWSConfig = {
"accessKeyId": AWSAccessKeyId,
"secretAccessKey": AWSSecretKey,
"region": 'us-east-1'
};
// Set aws config
AWS.config.update(AWSConfig);
let s3 = new AWS.S3({apiVersion: '2006-03-01'});
let mongo;
async function dequeue() {
let nextFile;
try {
nextFile = await mongo.db('live').collection('location_files').findOne({
status: 'ready'
}, {
sort: {
queue_time: -1
}
});
if (nextFile == null) {
console.log('No files to run, pausing for 5 minutes');
setTimeout(dequeue, 300000)
}
else {
console.log('Running the next location file for user ' + nextFile.user_id.toString('hex'));
await mongo.db('live').collection('location_files').updateOne({
_id: nextFile._id
}, {
$set: {
status: 'running'
}
});
let file = await new Promise(function(resolve, reject) {
s3.getObject({
Bucket : config.aws.s3.locations.bucket_name,
Key: nextFile.user_id.toString('hex') + '/' + moment(nextFile.upload_time).utc().unix() + '.json',
}, async function(err, data) {
if (err) {
reject(err);
}
else {
resolve(data);
}
});
});
let parsed = JSON.parse(file.Body);
let locations = parsed.locations;
console.log('Starting to parse locations');
let counter = 0;
let startIndex = 0;
let parsedLocations = [];
let finished = false;
_.each(locations, async function(location) {
if (location.latitudeE7 && location.longitudeE7 && location.timestampMs) {
let datetime = moment(parseInt(location.timestampMs));
let document = {
identifier: 'uploaded:::' + nextFile.user_id.toString('hex') + ':::' + datetime,
estimated: false,
datetime: moment(datetime).utc().toDate(),
geo_format: 'lat_lng',
geolocation: [location.longitudeE7 / 10000000, location.latitudeE7 / 10000000],
uploaded: true,
updated: moment().utc().toDate(),
user_id: nextFile.user_id
};
parsedLocations.push(document);
}
});
while (!finished) {
let bulkLocations = mongo.db('live').collection('locations').initializeUnorderedBulkOp();
let slice = parsedLocations.slice(startIndex, startIndex + sliceSize);
if (slice.length > 0) {
_.each(slice, function(document) {
bulkLocations.find({
identifier: document.identifier,
user_id: nextFile.user_id
})
.upsert()
.updateOne({
$set: document,
$setOnInsert: {
_id: uuid(uuid()),
created: document.updated
}
});
});
try {
await bulkLocations.execute();
} catch(err) {
console.log(err);
}
startIndex += sliceSize;
}
if (slice.length < sliceSize) {
finished = true;
}
}
console.log('Finished uploading locations');
await new Promise(function(resolve, reject) {
s3.deleteObject({
Bucket : config.aws.s3.locations.bucket_name,
Key: nextFile.user_id.toString('hex') + '/' + moment(nextFile.upload_time).utc().unix() + '.json',
}, async function(err, data) {
if (err) {
reject(err);
}
else {
resolve(data);
}
});
});
await mongo.db('live').collection('location_files').removeOne({
_id: nextFile._id
});
dequeue();
}
} catch(err) {
console.log('Error in location parsing');
console.log(err);
await mongo.db('live').collection('location_files').updateOne({
_id: nextFile._id
}, {
$set: {
status: 'ready',
queue_time: moment(nextFile.queue_time).add(6, 'hours').toDate()
}
});
dequeue();
}
}
(async function() {
mongo = await mongodb.MongoClient.connect(config.mongodb.address, config.mongodb.options);
//Randomize the start time so that multiple threads don't all simultaneously grab the same job on boot.
let waitTime = Math.random() * 10000;
setTimeout(dequeue, waitTime);
})();