Skip to content

Commit

Permalink
Merge pull request #289 from tidepool-org/gniezen/tandem-multirecord
Browse files Browse the repository at this point in the history
Support for Tandem multi-record downloads
  • Loading branch information
gniezen committed Jun 3, 2016
2 parents fd3603b + 9a004b1 commit 1e21a3e
Showing 1 changed file with 171 additions and 51 deletions.
222 changes: 171 additions & 51 deletions lib/drivers/tandemDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,12 @@ module.exports = function (config) {

var listenForPacket = function (command,args,callback) {

var abortTimer = setTimeout(function () {
clearInterval(listenTimer);
debug('TIMEOUT');
callback(new Error('Timeout error'), null);
}, RETRY_TIMEOUT * 2);

var retryTimer = setTimeout(function() {
console.log('Retrying with ',command, ', ',args);
tandemCommand(command, args, function (err) {
Expand All @@ -835,6 +841,7 @@ module.exports = function (config) {
var pkt = cfg.deviceComms.nextPacket();
if (pkt.valid && (command.response.value === pkt.descriptor)) {
clearTimeout(retryTimer);
clearTimeout(abortTimer);
clearInterval(listenTimer);
callback(null, pkt);
}else{
Expand All @@ -851,7 +858,16 @@ module.exports = function (config) {
if (format) {
payload_len = struct.structlen(format);
payload = new Uint8Array(payload_len);
struct.pack(payload, 0, format, args);
switch(args.length) {
case 1 :
struct.pack(payload, 0, format, args);
break;
case 2:
struct.pack(payload, 0, format, args[0], args[1]);
break;
default:
return callback(new Error('Unsupported number of arguments'));
}
}

var commandPacket = buildPacket(command, payload_len, payload);
Expand Down Expand Up @@ -881,13 +897,12 @@ module.exports = function (config) {
};

var tandemLogRequester = function (start, end, progress, callback) {
// TODO: implement and test multi-record download commands
if (__DEBUG__) {
debug('tandemLogRequester', start, end);
var start_exec = Date.now();
var startExec = Date.now();
}
var send_seq = start;
var receive_seq = start;
var sendSeq = start;
var receiveSeq = start;
var recovering = false;
var percentage = 0;
var prevPercentage = 0;
Expand All @@ -901,23 +916,27 @@ module.exports = function (config) {
var processPacket = function (pkt) {
if (pkt.valid &&
pkt.descriptor === RESPONSES.LOG_ENTRY_TE.value &&
pkt.payload['header_log_seq_no'] >= receive_seq) {
if (receive_seq != pkt.payload['header_log_seq_no']) {
pkt.payload['header_log_seq_no'] >= receiveSeq) {
if (receiveSeq != pkt.payload['header_log_seq_no']) {
if (!recovering) {
recovering = true;
debug('recovering ', receive_seq, '(received ',pkt.payload['header_log_seq_no'], ')');
send_seq = receive_seq + 1;
debug('recovering ', receiveSeq, '(received ',pkt.payload['header_log_seq_no'], ')');
sendSeq = receiveSeq + 1;
}

tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receive_seq], function (err) {
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receiveSeq], function (err) {
if(err) {
clearInterval(sendTimer);
clearInterval(listenTimer);
callback(err,null);
}
retryRecoverTimer = setTimeout(function() {
if(recovering) {
debug('Retrying to recover..',receive_seq);
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receive_seq], function (err) {
debug('Retrying to recover..',receiveSeq);
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receiveSeq], function (err) {
if(err) {
clearInterval(sendTimer);
clearInterval(listenTimer);
callback(err,null);
}
});
Expand All @@ -927,26 +946,26 @@ module.exports = function (config) {
}
else {
if (recovering) {
debug('recovered ', receive_seq, pkt);
debug('recovered ', receiveSeq, pkt);
clearTimeout(retryRecoverTimer);
}
receive_seq = pkt.payload['header_log_seq_no'] + 1;
receiveSeq = pkt.payload['header_log_seq_no'] + 1;
recovering = false;

percentage = ((receive_seq-start)/(end-start) * 90)+10;
percentage = ((receiveSeq-start)/(end-start) * 90)+10;
if(percentage > (prevPercentage+1)) {
// only update progress to UI if there's an increase of at least 1 percent
prevPercentage = percentage;
progress(percentage);
}

if (receive_seq % 1000 === 0) {
debug('received ', receive_seq, ' of ', end);
if (receiveSeq % 1000 === 0) {
debug('received ', receiveSeq, ' of ', end);
}
if (receive_seq > end) {
if (receiveSeq > end) {
if (__DEBUG__) {
var end_exec = Date.now();
var time = end_exec - start_exec;
var endExec = Date.now();
var time = endExec - startExec;
debug('Execution time of tandemLogRequester: ' + time);
}
cfg.deviceComms.flush(); // making sure we flush the buffers
Expand All @@ -962,29 +981,105 @@ module.exports = function (config) {
}, INTERVAL_FREQ);

var sendTimer = setInterval(function () {
if (send_seq % 1000 === 0) {
debug('requesting ', send_seq);
if (sendSeq % 1000 === 0) {
debug('requesting ', sendSeq);
}
if (!recovering && !pending) {
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [send_seq], function (err) {
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [sendSeq], function (err) {
if(err) {
clearInterval(listenTimer);
clearInterval(sendTimer);
callback(err,null);
}
});
if ((send_seq < end) && !recovering) {
send_seq++;
if ((sendSeq < end) && !recovering) {
sendSeq++;
}
}
}, INTERVAL_FREQ); // if we spin too quickly on this, packets don't get sent when window doesn't have focus
};

var multiLogRequester = function (start, end, progress, callback) {
if (__DEBUG__) {
debug('multiLogRequester', start, end);
var startExec = Date.now();
}
var sendSeq = start;
var receiveSeq = start;
var percentage = 0;
var prevPercentage = 0;

var retryTimeout = function() {
debug('Retrying from record ', receiveSeq, ' onwards.', end-receiveSeq, ' record(s) left..');
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_MULTI_REQ, [receiveSeq,end-receiveSeq+1], function (err) {
if(err) {
callback(err,null);
}
});
};
var retryTimer = setTimeout(retryTimeout, RETRY_TIMEOUT);

tandemCommand(COMMANDS.LOG_ENTRY_SEQ_MULTI_REQ, [sendSeq, end-sendSeq+1], function (err) {
// second parameter is number of records to retrieve, which is the difference between
// the start and end indexes plus one
if(err) {
callback(err,null);
}
else {
var listenTimer = setInterval(function () {
if(pending) {
debug('pending');
}
while (cfg.deviceComms.hasAvailablePacket() && !pending) {
var processPacket = function (pkt) {
if (pkt.valid &&
pkt.descriptor === RESPONSES.LOG_ENTRY_TE.value &&
pkt.payload['header_log_seq_no'] >= receiveSeq) {
if (receiveSeq != pkt.payload['header_log_seq_no']) {
debug('Packet out of sequence');
}
else {
clearTimeout(retryTimer);
retryTimer = setTimeout(retryTimeout,RETRY_TIMEOUT); // reset timeout
receiveSeq = pkt.payload['header_log_seq_no'] + 1;

percentage = ((receiveSeq-start)/(end-start) * 90)+10;
if(percentage > (prevPercentage+1)) {
// only update progress to UI if there's an increase of at least 1 percent
prevPercentage = percentage;
progress(percentage);
}

if (receiveSeq % 1000 === 0) {
debug('received ', receiveSeq, ' of ', end);
}
if (receiveSeq > end) {
if (__DEBUG__) {
var endExec = Date.now();
var time = endExec - startExec;
debug('Execution time of multiLogRequester: ' + time);
}
cfg.deviceComms.flush(); // making sure we flush the buffers
clearTimeout(retryTimer);
clearInterval(listenTimer);
}
callback(null, pkt);
}
}
};
processPacket(cfg.deviceComms.nextPacket());
}
}, INTERVAL_FREQ);
}
});
};

var newestEventRequester = function (start, end, progress, callback) {
var send_seq = start;
var sendSeq = start;
var end_seq = end;
var receive_seq = start;
var receiveSeq = start;
var recovering = false;
var percentage_seq = 0;
var percentageSeq = 0;
var retryRecoverTimer;

// this contains only the log events that we consider to define
Expand Down Expand Up @@ -1017,23 +1112,27 @@ module.exports = function (config) {
var processPacket = function (pkt) {
if (pkt.valid &&
pkt.descriptor === RESPONSES.LOG_ENTRY_TE.value &&
pkt.payload['header_log_seq_no'] <= receive_seq) {
if (receive_seq != pkt.payload['header_log_seq_no']) {
pkt.payload['header_log_seq_no'] <= receiveSeq) {
if (receiveSeq != pkt.payload['header_log_seq_no']) {
if (!recovering) {
recovering = true;
debug('recovering ', receive_seq);
debug('recovering ', receiveSeq);
}

send_seq = receive_seq - 1;
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receive_seq], function (err) {
sendSeq = receiveSeq - 1;
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receiveSeq], function (err) {
if(err) {
clearInterval(sendTimer);
clearInterval(listenTimer);
callback(err,null);
}
retryRecoverTimer = setTimeout(function() {
if(recovering) {
debug('Retrying to recover..',receive_seq);
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receive_seq], function (err) {
debug('Retrying to recover..',receiveSeq);
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [receiveSeq], function (err) {
if(err) {
clearInterval(sendTimer);
clearInterval(listenTimer);
callback(err,null);
}
});
Expand All @@ -1043,15 +1142,15 @@ module.exports = function (config) {
}
else {
if (recovering) {
debug('recovered ', receive_seq, pkt);
debug('recovered ', receiveSeq, pkt);
}
receive_seq = pkt.payload['header_log_seq_no'] - 1;
receiveSeq = pkt.payload['header_log_seq_no'] - 1;
recovering = false;

percentage_seq += 1;
if(percentage_seq % 100 === 0) {
percentageSeq += 1;
if(percentageSeq % 100 === 0) {
// increase percentage every 100 records
var percentage = percentage_seq/100;
var percentage = percentageSeq/100;
progress(percentage < 5 ? percentage : 5); //up to a max of 5 percent
}

Expand All @@ -1072,7 +1171,7 @@ module.exports = function (config) {
callback(null,pkt);
}

if (receive_seq < end_seq) {
if (receiveSeq < end_seq) {
debug('We did not find any events');
clearInterval(sendTimer);
clearInterval(listenTimer);
Expand All @@ -1085,18 +1184,19 @@ module.exports = function (config) {
}, INTERVAL_FREQ);

var sendTimer = setInterval(function () {
if (send_seq % 1000 === 0) {
console.log('requesting', send_seq);
if (sendSeq % 1000 === 0) {
console.log('requesting', sendSeq);
}
if (!recovering && !pending) {
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [send_seq], function (err) {
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_REQ, [sendSeq], function (err) {
if(err) {
clearInterval(sendTimer);
clearInterval(listenTimer);
callback(err,null);
}
});
if ((send_seq > end) && !recovering) {
send_seq--;
if ((sendSeq > end) && !recovering) {
sendSeq--;
}
}
}, INTERVAL_FREQ); // if we spin too quickly on this, packets don't get sent when window doesn't have focus
Expand Down Expand Up @@ -1131,14 +1231,33 @@ module.exports = function (config) {
if (result.payload.header_log_seq_no === end_seq) {
debug('fetched all records');
data.log_records = retval;
callback(null, data);

if(data.firmware_version >= COMMANDS.LOG_ENTRY_SEQ_MULTI_STOP_DUMP.version)
tandemCommand(COMMANDS.LOG_ENTRY_SEQ_MULTI_STOP_DUMP, null, function (err) {
if(err) {
callback(err,null);
}
else{
callback(null,data);
}
});
else {
callback(null, data);
}
}
}
}

end_seq = data.end_seq;
start_seq = data.start_seq;
tandemLogRequester(start_seq, end_seq, progress, iterate);
debug('Firmware version is #',data.firmware_version);
if(data.firmware_version >= COMMANDS.LOG_ENTRY_SEQ_MULTI_REQ.version) {
// woohoo, we can use multi-record downloads!
multiLogRequester(start_seq, end_seq, progress, iterate);
}
else{
tandemLogRequester(start_seq, end_seq, progress, iterate);
}
};

var tandemFetchEventRange = function (progress, data, callback) {
Expand Down Expand Up @@ -1206,7 +1325,7 @@ module.exports = function (config) {

var tandemFetch = function (progress, data, callback) {
if (__DEBUG__) {
var start_exec = Date.now();
var startExec = Date.now();
}
debug('getting event ranges');
var entries;
Expand Down Expand Up @@ -1249,8 +1368,8 @@ module.exports = function (config) {

data.start_seq = currentIndex;
if (__DEBUG__) {
var end_exec = Date.now();
var time = end_exec - start_exec;
var endExec = Date.now();
var time = endExec - startExec;
debug('Execution time for binary search: ' + time);
}
progress(10);
Expand Down Expand Up @@ -1902,6 +2021,7 @@ module.exports = function (config) {
}
data.model_no = result.payload.model_no;
data.pump_sn = result.payload.pump_sn;
data.firmware_version = parseInt(result.payload.arm_sw_ver);
cb(null, data);
}
});
Expand Down

0 comments on commit 1e21a3e

Please sign in to comment.