Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #220 from bookchin/optimization/dowloads
Browse files Browse the repository at this point in the history
v1.3.3 (Download Optimizations)
  • Loading branch information
bookchin authored Jun 28, 2016
2 parents f3afd54 + 4e73fbc commit 3068347
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 39 deletions.
62 changes: 45 additions & 17 deletions lib/bridgeclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var kad = require('kad');
var EventEmitter = require('events').EventEmitter;
var UploadState = require('./uploadstate');
var stream = require('readable-stream');
var async = require('async');

/**
* Represents a client interface to a given bridge server
Expand All @@ -41,7 +42,7 @@ function BridgeClient(uri, options) {

this._options = this._checkOptions(uri, options);
this._logger = this._options.logger;
this._uploadConcurrency = this._options.concurrency;
this._transferConcurrency = this._options.concurrency;
}

/**
Expand Down Expand Up @@ -330,7 +331,7 @@ BridgeClient.prototype.storeFileInBucket = function(id, token, file, cb) {
numShards: Math.ceil(
fs.statSync(file).size / FileDemuxer.DEFAULTS.shardSize
),
concurrency: this._uploadConcurrency
concurrency: this._transferConcurrency
});

self.createFileStagingFrame(function(err, frame) {
Expand Down Expand Up @@ -570,6 +571,20 @@ BridgeClient.prototype.getFilePointer = function(bucket, token, fileID, cb) {
});
};

/**
* Create a readable datachannel stream from the supplied file pointer
* @private
* @param {Object} pointer
* @param {Function} callback
*/
BridgeClient.prototype._createInputFromPointer = function(pointer, callback) {
var dcx = new DataChannelClient(new Contact(pointer.farmer));

dcx.on('open', function() {
callback(dcx.createReadStream(pointer.token, pointer.hash));
});
};

/**
* Open a series of data channels based on the returned value of
* {@link BridgeClient#getFilePointer} to resolve all the shards and
Expand All @@ -578,28 +593,41 @@ BridgeClient.prototype.getFilePointer = function(bucket, token, fileID, cb) {
* @param {Function} callback
*/
BridgeClient.prototype.resolveFileFromPointers = function(pointers, callback) {
var opened = 0;
var size = pointers.reduce(function(a, b) {
return { size: a.size + b.size };
}, { size: 0 }).size;
var self = this;
var muxer = new FileMuxer({
shards: pointers.length,
length: size
length: pointers.reduce(function(a, b) {
return { size: a.size + b.size };
}, { size: 0 }).size
});

pointers.forEach(function(pointer) {
var dcx = new DataChannelClient(new Contact(pointer.farmer));
function _addInputToMultiplexer(pointer, onInputAdded) {
if (!pointer) {
return onInputAdded();
}

self._createInputFromPointer(pointer, function(inputStream) {
muxer.input(inputStream);
onInputAdded();
});
}

dcx.on('open', function() {
muxer.input(dcx.createReadStream(pointer.token, pointer.hash));
var queue = async.queue(_addInputToMultiplexer, 1);

opened++;
function _addPointerToInputQueue(done) {
queue.push(pointers.shift(), done);
}

if (opened === pointers.length) {
callback(null, muxer);
}
});
});
muxer.on('drain', _addPointerToInputQueue.bind(null, null));
async.times(
this._transferConcurrency,
function addInputSource(n, next) {
_addPointerToInputQueue(next);
},
function onInputsAdded() {
callback(null, muxer);
}
);
};

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module.exports = {
CLEAN_INTERVAL: 10800000,
/** @constant {Number} CONSIGN_THRESHOLD - Threshold for consign time */
CONSIGN_THRESHOLD: 86400000,
/** @constant {Number} TOKEN_EXPIRE - Reject datachannl token after time */
TOKEN_EXPIRE: 86400000,
/** @constant {Number} TUNNEL_ANNOUNCE_INTERVAL - Announce tunnel state */
TUNNEL_ANNOUNCE_INTERVAL: 900000,
/** @constant {Number} ROUTER_CLEAN_INTERVAL - Drop bad contacts */
Expand Down
11 changes: 8 additions & 3 deletions lib/datachannel/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ var assert = require('assert');
var Manager = require('../manager');
var events = require('events');
var inherits = require('util').inherits;
var ms = require('ms');
var crypto = require('crypto');
var utils = require('../utils');
var constants = require('../constants');

/**
* Creates a data channel server for sending and receiving consigned file shards
Expand All @@ -30,7 +30,7 @@ function DataChannelServer(options) {
this._server = options.server;
this._manager = options.manager;
this._log = options.logger;
this._ttl = options.ttl || ms('10m');
this._ttl = options.ttl || constants.TOKEN_EXPIRE;
this._server = new ws.Server({ server: this._server });
this._allowed = {};

Expand All @@ -49,7 +49,11 @@ DataChannelServer.prototype.accept = function(token, filehash) {
assert(typeof token === 'string', 'Invalid token supplied');
assert(typeof filehash === 'string', 'Invalid filehash supplied');

this._allowed[token] = { hash: filehash, client: null };
this._allowed[token] = {
hash: filehash,
client: null,
expires: Date.now() + this._ttl
};
};

/**
Expand Down Expand Up @@ -144,6 +148,7 @@ DataChannelServer.prototype._authorize = function(token, hash) {
assert.ok(token, 'You did not supply a token');
assert.ok(self._allowed[token], 'The supplied token is not accepted');
assert.ok(hash, 'You did not supply the data hash');
assert(self._allowed[token].expires > Date.now(), 'Token has expired');
assert(self._allowed[token].client === null, 'Channel is already active');
assert(self._allowed[token].hash === hash, 'Token not valid for hash');
};
Expand Down
9 changes: 3 additions & 6 deletions lib/filedemuxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var stream = require('readable-stream');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var merge = require('merge');
var utils = require('./utils');

/**
* Takes a single file read stream and outputs several output streams, used for
Expand Down Expand Up @@ -67,19 +68,15 @@ FileDemuxer.prototype._openStream = function() {
*/
FileDemuxer.prototype._handleSourceBytes = function(chunk) {
if (!this._currentOutput) {
this._currentOutput = new stream.Readable({
read: function noop() {}
});
this._currentOutput = new stream.Readable({ read: utils.noop });

this.emit('shard', this._currentOutput, this._currentShardIndex);
}

if (this._needsNewOutputStream()) {
this._closeCurrentOutput();

this._currentOutput = new stream.Readable({
read: function noop() {}
});
this._currentOutput = new stream.Readable({ read: utils.noop });

this.emit('shard', this._currentOutput, ++this._currentShardIndex);
}
Expand Down
8 changes: 2 additions & 6 deletions lib/filemuxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,18 @@ FileMuxer.prototype._checkOptions = function(options) {
* @private
*/
FileMuxer.prototype._read = function() {
var self = this;

if (this._bytesRead === this._length) {
return this.push(null);
}

if (!this._inputs[0]) {
return this.emit('error', new Error('Unexpected end of input'));
return setImmediate(this._read.bind(this));
}

var bytes = this._inputs[0].read();

if (bytes === null && this._bytesRead < this._length) {
return setImmediate(function() {
self._read();
});
return setImmediate(this._read.bind(this));
}

if (this._length < this._bytesRead + bytes.length) {
Expand Down
6 changes: 6 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,9 @@ module.exports.simpleDecrypt = function(password, str) {

return buf.toString('utf8');
};

/**
* Empty function stub
* @private
*/
module.exports.noop = function() {};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "storj",
"version": "1.3.2",
"version": "1.3.3",
"description": "implementation of the storj protocol for node.js and the browser",
"main": "index.js",
"directories": {
Expand Down
6 changes: 5 additions & 1 deletion test/datachannel/server.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ describe('DataChannelServer', function() {
manager: Manager(RAMStorageAdapter()),
logger: Logger(0)
});
dcs._allowed.token = { client: null, hash: 'test' };
dcs._allowed.token = {
client: null,
hash: 'test',
expires: Date.now() + 12000
};
var socket = new EventEmitter();
socket.close = function(code, message) {
expect(code).to.equal(400);
Expand Down
18 changes: 13 additions & 5 deletions test/filemuxer.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,20 @@ describe('FileMuxer', function() {
});
});

it('should error if read with no inputs', function(done) {
FileMuxer({ shards: 2, length: 128 }).on('error', function(err) {
expect(err.message).to.equal('Unexpected end of input');
it('should wait until next tick if no input is available', function(done) {
var pushed = false;
FileMuxer({ shards: 2, length: 128 }).on('data', function() {
done();
}).read();

}).input(ReadableStream({
read: function() {
if (pushed) {
this.push(null);
} else {
pushed = true;
this.push(Buffer('hay gurl hay'));
}
}
}));
});

it('should error if input length exceeds declared length', function(done) {
Expand Down

0 comments on commit 3068347

Please sign in to comment.