From d7dd6f841bd0e773cba6c67a2a8abc32c99578c3 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 11:36:46 -0400 Subject: [PATCH 01/10] more efficient download streaming --- lib/bridgeclient.js | 51 ++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index e56ec53a..ba418134 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -41,7 +41,7 @@ function BridgeClient(uri, options) { this._options = this._checkOptions(uri, options); this._logger = this._options.logger; - this._uploadConcurrency = this._options.concurrency; + this._transferConcurrency = this._options.concurrency; } /** @@ -330,7 +330,7 @@ BridgeClient.prototype.storeFileInBucket = function(id, token, file, cb) { numShards: Math.ceil( fs.statSync(file).size / FileDemuxer.DEFAULTS.shardSize ), - concurrency: this._uploadConcurrency + concurrency: this._transferConcurrency }); self.createFileStagingFrame(function(err, frame) { @@ -570,6 +570,22 @@ BridgeClient.prototype.getFilePointer = function(bucket, token, fileID, cb) { }); }; +/** + * Create a readable datachannel stream from the supplied file pointer + * @private + * @param {Object} pointer + * @param {Function} callback + */ +BridgeClient.prototype._createInputFromPointer = function(pointer, callback) { + var dcx = new DataChannelClient(new Contact(pointer.farmer)); + + this._logger.info('Opening data channel stream from %j', pointer.farmer); + + dcx.on('open', function() { + callback(dcx.createReadStream(pointer.token, pointer.hash)); + }); +}; + /** * Open a series of data channels based on the returned value of * {@link BridgeClient#getFilePointer} to resolve all the shards and @@ -578,28 +594,29 @@ BridgeClient.prototype.getFilePointer = function(bucket, token, fileID, cb) { * @param {Function} callback */ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { - var opened = 0; - var size = pointers.reduce(function(a, b) { - return { size: a.size + b.size }; - }, { size: 0 }).size; + var self = this; var muxer = new FileMuxer({ shards: pointers.length, - length: size + length: pointers.reduce(function(a, b) { + return { size: a.size + b.size }; + }, { size: 0 }).size }); - pointers.forEach(function(pointer) { - var dcx = new DataChannelClient(new Contact(pointer.farmer)); + function _addInputToMultiplexer() { + var nextPointer = pointers.shift(); - dcx.on('open', function() { - muxer.input(dcx.createReadStream(pointer.token, pointer.hash)); + if (nextPointer) { + self._createInputFromPointer(nextPointer, muxer.input.bind(muxer)); + } + } - opened++; + muxer.on('drain', _addInputToMultiplexer); - if (opened === pointers.length) { - callback(null, muxer); - } - }); - }); + for (var i = 0; i < this._transferConcurrency; i++) { + _addInputToMultiplexer(); + } + + callback(null, muxer); }; /** From 460b236942fc0b34315f0d4a1e7fe887082b7c41 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 11:37:22 -0400 Subject: [PATCH 02/10] bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 81aae733..2b8ebed8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "storj", - "version": "1.3.2", + "version": "1.3.3", "description": "implementation of the storj protocol for node.js and the browser", "main": "index.js", "directories": { From 490a4923c93dd5026fa2241b891c36f36aae83d2 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 11:59:58 -0400 Subject: [PATCH 03/10] fix unexpected end of input on muxer --- lib/bridgeclient.js | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index ba418134..bf9d06c1 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -20,6 +20,7 @@ var kad = require('kad'); var EventEmitter = require('events').EventEmitter; var UploadState = require('./uploadstate'); var stream = require('readable-stream'); +var async = require('async'); /** * Represents a client interface to a given bridge server @@ -602,21 +603,30 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { }, { size: 0 }).size }); - function _addInputToMultiplexer() { + function _addInputToMultiplexer(onInputAdded) { var nextPointer = pointers.shift(); if (nextPointer) { - self._createInputFromPointer(nextPointer, muxer.input.bind(muxer)); + return self._createInputFromPointer(nextPointer, function(inputStream) { + muxer.input(inputStream); + onInputAdded(); + }); } - } - - muxer.on('drain', _addInputToMultiplexer); - for (var i = 0; i < this._transferConcurrency; i++) { - _addInputToMultiplexer(); + onInputAdded(); } - callback(null, muxer); + muxer.on('drain', _addInputToMultiplexer.bind(null, function noop() {})); + + async.times( + this._transferConcurrency, + function addInputSource(n, next) { + _addInputToMultiplexer(next); + }, + function onInputsAdded() { + callback(null, muxer); + } + ); }; /** From 7604021220181b9792a106b0f063e5a51b2f3aa0 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 12:33:20 -0400 Subject: [PATCH 04/10] add noop function to utils for sharing --- lib/bridgeclient.js | 2 +- lib/filedemuxer.js | 9 +++------ lib/utils.js | 6 ++++++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index bf9d06c1..2a7cdf7a 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -616,7 +616,7 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { onInputAdded(); } - muxer.on('drain', _addInputToMultiplexer.bind(null, function noop() {})); + muxer.on('drain', _addInputToMultiplexer.bind(null, utils.noop)); async.times( this._transferConcurrency, diff --git a/lib/filedemuxer.js b/lib/filedemuxer.js index 0d738277..7ef40a54 100644 --- a/lib/filedemuxer.js +++ b/lib/filedemuxer.js @@ -6,6 +6,7 @@ var stream = require('readable-stream'); var fs = require('fs'); var EventEmitter = require('events').EventEmitter; var merge = require('merge'); +var utils = require('./utils'); /** * Takes a single file read stream and outputs several output streams, used for @@ -67,9 +68,7 @@ FileDemuxer.prototype._openStream = function() { */ FileDemuxer.prototype._handleSourceBytes = function(chunk) { if (!this._currentOutput) { - this._currentOutput = new stream.Readable({ - read: function noop() {} - }); + this._currentOutput = new stream.Readable({ read: utils.noop }); this.emit('shard', this._currentOutput, this._currentShardIndex); } @@ -77,9 +76,7 @@ FileDemuxer.prototype._handleSourceBytes = function(chunk) { if (this._needsNewOutputStream()) { this._closeCurrentOutput(); - this._currentOutput = new stream.Readable({ - read: function noop() {} - }); + this._currentOutput = new stream.Readable({ read: utils.noop }); this.emit('shard', this._currentOutput, ++this._currentShardIndex); } diff --git a/lib/utils.js b/lib/utils.js index 5c345b49..15fd37d4 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -199,3 +199,9 @@ module.exports.simpleDecrypt = function(password, str) { return buf.toString('utf8'); }; + +/** + * Empty function stub + * @private + */ +module.exports.noop = function() {}; From 17b39a55f2446141ba79ef0cbafffb86be8deaab Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 13:16:09 -0400 Subject: [PATCH 05/10] honor token expire time and set default to 24h --- lib/constants.js | 2 ++ lib/datachannel/server.js | 10 ++++++++-- test/datachannel/server.unit.js | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/constants.js b/lib/constants.js index 530162b5..f5a08943 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -19,6 +19,8 @@ module.exports = { CLEAN_INTERVAL: 10800000, /** @constant {Number} CONSIGN_THRESHOLD - Threshold for consign time */ CONSIGN_THRESHOLD: 86400000, + /** @constant {Number} TOKEN_EXPIRE - Reject datachannl token after time */ + TOKEN_EXPIRE: 86400000, /** @constant {Number} TUNNEL_ANNOUNCE_INTERVAL - Announce tunnel state */ TUNNEL_ANNOUNCE_INTERVAL: 900000, /** @constant {Number} ROUTER_CLEAN_INTERVAL - Drop bad contacts */ diff --git a/lib/datachannel/server.js b/lib/datachannel/server.js index 71993837..aa2919f8 100644 --- a/lib/datachannel/server.js +++ b/lib/datachannel/server.js @@ -9,6 +9,7 @@ var inherits = require('util').inherits; var ms = require('ms'); var crypto = require('crypto'); var utils = require('../utils'); +var constants = require('../constants'); /** * Creates a data channel server for sending and receiving consigned file shards @@ -30,7 +31,7 @@ function DataChannelServer(options) { this._server = options.server; this._manager = options.manager; this._log = options.logger; - this._ttl = options.ttl || ms('10m'); + this._ttl = options.ttl || constants.TOKEN_EXPIRE; this._server = new ws.Server({ server: this._server }); this._allowed = {}; @@ -49,7 +50,11 @@ DataChannelServer.prototype.accept = function(token, filehash) { assert(typeof token === 'string', 'Invalid token supplied'); assert(typeof filehash === 'string', 'Invalid filehash supplied'); - this._allowed[token] = { hash: filehash, client: null }; + this._allowed[token] = { + hash: filehash, + client: null, + expires: Date.now() + this._ttl + }; }; /** @@ -144,6 +149,7 @@ DataChannelServer.prototype._authorize = function(token, hash) { assert.ok(token, 'You did not supply a token'); assert.ok(self._allowed[token], 'The supplied token is not accepted'); assert.ok(hash, 'You did not supply the data hash'); + assert(self._allowed[token].expires > Date.now(), 'Token has expired'); assert(self._allowed[token].client === null, 'Channel is already active'); assert(self._allowed[token].hash === hash, 'Token not valid for hash'); }; diff --git a/test/datachannel/server.unit.js b/test/datachannel/server.unit.js index b43e1cee..94d62d88 100644 --- a/test/datachannel/server.unit.js +++ b/test/datachannel/server.unit.js @@ -121,7 +121,11 @@ describe('DataChannelServer', function() { manager: Manager(RAMStorageAdapter()), logger: Logger(0) }); - dcs._allowed.token = { client: null, hash: 'test' }; + dcs._allowed.token = { + client: null, + hash: 'test', + expires: Date.now() + 12000 + }; var socket = new EventEmitter(); socket.close = function(code, message) { expect(code).to.equal(400); From b7ebc8f3d28ee145ceaae4a4668a8d72ad1f6ee3 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 15:08:41 -0400 Subject: [PATCH 06/10] more graceful handling of input stream to muxer --- lib/bridgeclient.js | 2 -- lib/datachannel/server.js | 1 - lib/filemuxer.js | 6 ++---- test/filemuxer.unit.js | 18 +++++++++++++----- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index 2a7cdf7a..373084ac 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -580,8 +580,6 @@ BridgeClient.prototype.getFilePointer = function(bucket, token, fileID, cb) { BridgeClient.prototype._createInputFromPointer = function(pointer, callback) { var dcx = new DataChannelClient(new Contact(pointer.farmer)); - this._logger.info('Opening data channel stream from %j', pointer.farmer); - dcx.on('open', function() { callback(dcx.createReadStream(pointer.token, pointer.hash)); }); diff --git a/lib/datachannel/server.js b/lib/datachannel/server.js index aa2919f8..a552e530 100644 --- a/lib/datachannel/server.js +++ b/lib/datachannel/server.js @@ -6,7 +6,6 @@ var assert = require('assert'); var Manager = require('../manager'); var events = require('events'); var inherits = require('util').inherits; -var ms = require('ms'); var crypto = require('crypto'); var utils = require('../utils'); var constants = require('../constants'); diff --git a/lib/filemuxer.js b/lib/filemuxer.js index acee2099..e5250b3c 100644 --- a/lib/filemuxer.js +++ b/lib/filemuxer.js @@ -63,15 +63,13 @@ FileMuxer.prototype._read = function() { } if (!this._inputs[0]) { - return this.emit('error', new Error('Unexpected end of input')); + return setImmediate(this._read.bind(this)); } var bytes = this._inputs[0].read(); if (bytes === null && this._bytesRead < this._length) { - return setImmediate(function() { - self._read(); - }); + return setImmediate(this._read.bind(this)); } if (this._length < this._bytesRead + bytes.length) { diff --git a/test/filemuxer.unit.js b/test/filemuxer.unit.js index 90642bda..459317d7 100644 --- a/test/filemuxer.unit.js +++ b/test/filemuxer.unit.js @@ -112,12 +112,20 @@ describe('FileMuxer', function() { }); }); - it('should error if read with no inputs', function(done) { - FileMuxer({ shards: 2, length: 128 }).on('error', function(err) { - expect(err.message).to.equal('Unexpected end of input'); + it('should wait until next tick if no input is available', function(done) { + var pushed = false; + FileMuxer({ shards: 2, length: 128 }).on('data', function(data) { done(); - }).read(); - + }).input(ReadableStream({ + read: function() { + if (pushed) { + this.push(null); + } else { + pushed = true; + this.push(Buffer('hay gurl hay')); + } + } + })); }); it('should error if input length exceeds declared length', function(done) { From a79a1efe6a59db6486ef8237fa70994366bde3a6 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 15:12:21 -0400 Subject: [PATCH 07/10] jshint --- lib/filemuxer.js | 2 -- test/filemuxer.unit.js | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/filemuxer.js b/lib/filemuxer.js index e5250b3c..0153b54f 100644 --- a/lib/filemuxer.js +++ b/lib/filemuxer.js @@ -56,8 +56,6 @@ FileMuxer.prototype._checkOptions = function(options) { * @private */ FileMuxer.prototype._read = function() { - var self = this; - if (this._bytesRead === this._length) { return this.push(null); } diff --git a/test/filemuxer.unit.js b/test/filemuxer.unit.js index 459317d7..3951531c 100644 --- a/test/filemuxer.unit.js +++ b/test/filemuxer.unit.js @@ -114,7 +114,7 @@ describe('FileMuxer', function() { it('should wait until next tick if no input is available', function(done) { var pushed = false; - FileMuxer({ shards: 2, length: 128 }).on('data', function(data) { + FileMuxer({ shards: 2, length: 128 }).on('data', function() { done(); }).input(ReadableStream({ read: function() { From 669dcd039f2d9af3cbeefcbd945ab1bcbe2cc4d6 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 16:33:22 -0400 Subject: [PATCH 08/10] ensure ordering of muxer input --- lib/bridgeclient.js | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index 373084ac..5eacd1da 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -601,25 +601,29 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { }, { size: 0 }).size }); - function _addInputToMultiplexer(onInputAdded) { - var nextPointer = pointers.shift(); - - if (nextPointer) { - return self._createInputFromPointer(nextPointer, function(inputStream) { - muxer.input(inputStream); - onInputAdded(); - }); + var queue = async.queue(_addInputToMultiplexer, 1); + + function _addInputToMultiplexer(pointer, onInputAdded) { + if (!pointer) { + return onInputAdded(); } - onInputAdded(); + self._createInputFromPointer(pointer, function(inputStream) { + muxer.input(inputStream); + onInputAdded(); + }); + } + + function _addPointerToInputQueue(done) { + queue.push(pointers.shift(), done); } - muxer.on('drain', _addInputToMultiplexer.bind(null, utils.noop)); + muxer.on('drain', _addPointerToInputQueue); async.times( this._transferConcurrency, function addInputSource(n, next) { - _addInputToMultiplexer(next); + _addPointerToInputQueue(next); }, function onInputsAdded() { callback(null, muxer); From 088b7efcb555c9c71703ba57c99d0a0502adbb69 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 16:34:27 -0400 Subject: [PATCH 09/10] jshint --- lib/bridgeclient.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index 5eacd1da..e129d187 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -601,8 +601,6 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { }, { size: 0 }).size }); - var queue = async.queue(_addInputToMultiplexer, 1); - function _addInputToMultiplexer(pointer, onInputAdded) { if (!pointer) { return onInputAdded(); @@ -614,12 +612,13 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { }); } + var queue = async.queue(_addInputToMultiplexer, 1); + function _addPointerToInputQueue(done) { queue.push(pointers.shift(), done); } muxer.on('drain', _addPointerToInputQueue); - async.times( this._transferConcurrency, function addInputSource(n, next) { From 4e73fbc9f651fafe54141d53869dffd04c937d80 Mon Sep 17 00:00:00 2001 From: Gordon Hall Date: Tue, 28 Jun 2016 16:37:06 -0400 Subject: [PATCH 10/10] ensure correct callback type --- lib/bridgeclient.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/bridgeclient.js b/lib/bridgeclient.js index e129d187..26f55c05 100644 --- a/lib/bridgeclient.js +++ b/lib/bridgeclient.js @@ -618,7 +618,7 @@ BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) { queue.push(pointers.shift(), done); } - muxer.on('drain', _addPointerToInputQueue); + muxer.on('drain', _addPointerToInputQueue.bind(null, null)); async.times( this._transferConcurrency, function addInputSource(n, next) {