Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #348 from bookchin/master
Browse files Browse the repository at this point in the history
v3.0.4
  • Loading branch information
bookchin authored Aug 19, 2016
2 parents 6b20da8 + 3a8d92e commit 1a19c43
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 181 deletions.
6 changes: 6 additions & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### Prerequisites

* Node.js LTS (v4.x.x)
* Python 2.7
* Git 2.x.x

### Installing on GNU/Linux & Mac OSX

Install Node.js and it's package manager NPM using Node Version Manager:
Expand Down
99 changes: 69 additions & 30 deletions lib/bridge-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ BridgeClient.prototype.destroyFileStagingFrameById = function(id, callback) {
BridgeClient.prototype.addShardToFileStagingFrame = function(f, s, opt, cb) {
var self = this;
var retries = 0;
var pendingReq = null;

if (typeof arguments[2] === 'function') {
cb = opt;
Expand All @@ -331,7 +332,7 @@ BridgeClient.prototype.addShardToFileStagingFrame = function(f, s, opt, cb) {
function _addShard() {
self._logger.info('Adding shard metadata for %s to frame', s.hash);

return self._request('PUT', '/frames/' + f, s, function(err, result) {
pendingReq = self._request('PUT', '/frames/' + f, s, function(err, result) {
if (err) {
if (opt.retry > retries) {
retries++;
Expand All @@ -345,7 +346,14 @@ BridgeClient.prototype.addShardToFileStagingFrame = function(f, s, opt, cb) {
});
}

return _addShard();
_addShard();

return {
cancel: function() {
opt.retry = 0;
pendingReq.abort();
}
};
};

/**
Expand Down Expand Up @@ -471,14 +479,31 @@ BridgeClient.prototype._handleShardTmpFileFinish = function(state, meta, done) {

self._logger.info('Hash for this shard is: %s', hash);

function _handleError(err) {
self._logger.warn('Failed to upload shard...');
state.cleanup();
return state.callback(err);
}

function _teardownAuditListeners() {
auditGenerator.removeAllListeners();
}

shardFile.on('error', _handleError);
state.on('killed', _teardownAuditListeners);

shardFile.pipe(auditGenerator).on('finish', function() {
if (state.killed) {
return done();
}

var challenges = auditGenerator.getPrivateRecord().challenges;
var tree = auditGenerator.getPublicRecord();

self._logger.info('Audit generation for shard done.');
self._logger.info('Waiting on a storage offer from the network...');

self.addShardToFileStagingFrame(meta.frame.id, {
var addShardToFrame = self.addShardToFileStagingFrame(meta.frame.id, {
hash: hash,
size: meta.size,
index: meta.index,
Expand All @@ -491,13 +516,14 @@ BridgeClient.prototype._handleShardTmpFileFinish = function(state, meta, done) {
}

if (err) {
self._logger.warn('Failed to upload shard...');
state.cleanup();
return state.callback(err);
return _handleError(err);
}

self._startTransfer(pointer, state, meta, done);
});

state.removeListener('killed', _teardownAuditListeners);
state.on('killed', addShardToFrame.cancel);
});
};

Expand All @@ -511,16 +537,24 @@ BridgeClient.prototype._handleShardTmpFileFinish = function(state, meta, done) {
*/
BridgeClient.prototype._startTransfer = function(pointer, state, meta, done) {
var self = this;
var emitter = new EventEmitter();
var transferStatus = self._transferShard(emitter, meta.tmpName, pointer);
var transferStatus = self._transferShard(
new EventEmitter(),
meta.tmpName,
pointer,
state
);

state.on('killed', function() {
transferStatus.removeAllListeners();
});

self._logger.info('Contract negotiated with: %j', pointer.farmer);

transferStatus.on('retry', function() {
if (meta.transferRetries < self._options.transferRetries) {
meta.transferRetries++;
self._logger.info('Retrying shard transfer, pointer: %j', pointer);
self._transferShard(emitter, meta.tmpName, pointer);
self._transferShard(transferStatus, meta.tmpName, pointer, state);
} else {
self._logger.info(
'Shard transfer failed %s times, getting another contract...',
Expand Down Expand Up @@ -549,50 +583,49 @@ BridgeClient.prototype._startTransfer = function(pointer, state, meta, done) {
* @param {Function} done - Task completion callback
*/
BridgeClient.prototype._shardTransferComplete = function(state, frame, done) {
var self = this;

state.completed++;

self._logger.info(
this._logger.info(
'Shard transfer completed! %s remaining...',
state.numShards - state.completed
);

done();

if (state.completed === state.numShards) {
state.cleanup();
if (state.completed !== state.numShards) {
return;
}

// NB: use the original filename if called from cli
var origFileName = path.basename(state.file).split('.crypt')[0];
// NB: use the original filename if called from cli
var origFileName = path.basename(state.file).split('.crypt')[0];

self._logger.info('Transfer finished, creating entry...');
self._request('POST', '/buckets/' + state.bucketId + '/files', {
frame: frame.id,
mimetype: mime.lookup(origFileName),
filename: origFileName
}, state.callback);
}
state.cleanup();
this._logger.info('Transfer finished, creating entry...');
this._request('POST', '/buckets/' + state.bucketId + '/files', {
frame: frame.id,
mimetype: mime.lookup(origFileName),
filename: origFileName
}, state.callback);
};

/**
* Transfers a shard to a specified farmer
* @param {events.EventEmitter} emitter - For getting status events
* @param {String} tmpName - Path to shard file
* @param {Object} pointer - Farmer Contact information
* @param {Function} callback
* @param {UploadState} state - The upload state machine
*/
BridgeClient.prototype._transferShard = function(emitter, name, pointer) {
BridgeClient.prototype._transferShard = function(evt, name, pointer, state) {
var self = this;
var shardFile = fs.createReadStream(name);
var client = new DataChannelClient(Contact(pointer.farmer));

function _onErr(err) {
self._logger.warn('Failed to transfer shard, reason: %s', err.message);
client.removeAllListeners();
emitter.emit('retry', name, pointer);
evt.emit('retry', name, pointer);
}

state.on('killed', client.removeAllListeners.bind('client'));
client.on('error', _onErr).on('open', function() {
self._logger.info('Data channel opened, transferring shard...');

Expand All @@ -602,11 +635,17 @@ BridgeClient.prototype._transferShard = function(emitter, name, pointer) {
);

shardFile.pipe(datachannel).on('error', _onErr).on('finish', function() {
emitter.emit('finish');
evt.emit('finish');
});

state.on('killed', function() {
shardFile.unpipe(datachannel);
datachannel.removeAllListeners();
evt.emit('finish');
});
});

return emitter;
return evt;
};

/**
Expand Down Expand Up @@ -913,7 +952,7 @@ BridgeClient.prototype._request = function(method, path, params, callback) {

this._authenticate(opts);

request(opts, function(err, res, body) {
return request(opts, function(err, res, body) {
if (err) {
return callback(err);
}
Expand Down
15 changes: 15 additions & 0 deletions lib/bridge-client/upload-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ var fs = require('fs');
var async = require('async');
var utils = require('../utils');
var merge = require('merge');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;

/**
* Internal state machine used by {@link BridgeClient}
Expand All @@ -18,6 +20,7 @@ var merge = require('merge');
* @param {Function} options.onComplete - Reference to callback after complete
*/
function UploadState(options) {
/* jshint maxstatements: 14 */
if (!(this instanceof UploadState)) {
return new UploadState(options);
}
Expand All @@ -33,8 +36,18 @@ function UploadState(options) {
this.concurrency = options.concurrency;
this.queue = async.queue(options.worker, this.concurrency);
this.killed = false;

EventEmitter.call(this);
this.setMaxListeners(0);
}

inherits(UploadState, EventEmitter);

/**
* Triggered when the upload queue has been killed
* @event UploadState#killed
*/

UploadState.DEFAULTS = {
concurrency: 6
};
Expand All @@ -52,6 +65,8 @@ UploadState.prototype.cleanup = function() {
});

this.queue.kill();
this.emit('killed');
this.removeAllListeners();
};

module.exports = UploadState;
14 changes: 7 additions & 7 deletions lib/contract/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ Contract.CRITERIA = {
*/
Contract.MATRIX = {
size: function(size) {
if (size > 0 && size <= (8 * 1024 * 1024)) {
if (size > 0 && size <= (32 * 1024 * 1024)) {
return constants.OPCODE_DEG_LOW;
}

if (size > (8 * 1024 * 1024) && size <= (16 * 1024 * 1024)) {
if (size > (32 * 1024 * 1024) && size <= (512 * 1024 * 1024)) {
return constants.OPCODE_DEG_MED;
}

if (size > (16 * 1024 * 1024) && size <= (32 * 1024 * 1024)) {
if (size > (512 * 1024 * 1024) && size <= (4096 * 1024 * 1024)) {
return constants.OPCODE_DEG_HIGH;
}

return null;
return constants.OPCODE_DEG_HIGH;
},
duration: function(duration) {
if (duration > 0 && duration <= ms('30d')) {
Expand All @@ -118,7 +118,7 @@ Contract.MATRIX = {
return constants.OPCODE_DEG_HIGH;
}

return null;
return constants.OPCODE_DEG_HIGH;
},
availability: function(availability) {
if (availability >= 0.5 && availability <= 0.7) {
Expand All @@ -133,7 +133,7 @@ Contract.MATRIX = {
return constants.OPCODE_DEG_HIGH;
}

return null;
return constants.OPCODE_DEG_HIGH;
},
speed: function(speed) {
if (speed > 0 && speed <= 6) {
Expand All @@ -148,7 +148,7 @@ Contract.MATRIX = {
return constants.OPCODE_DEG_HIGH;
}

return null;
return constants.OPCODE_DEG_HIGH;
}
};

Expand Down
2 changes: 1 addition & 1 deletion lib/crypto-tools/decrypt-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var DataCipherKeyIv = require('../crypto-tools/cipher-key-iv');
* Represents a duplex stream capable of taking encrypted data as input and
* producing output decrypted by a {@link DataCipherKeyIv}
* @constructor
* @license AGPL-3.0
* @license LGPL-3.0
* @param {DataCipherKeyIv} keyiv - Object to use for derivation function
* @emits DecryptStream#data
* @emits DecryptStream#end
Expand Down
2 changes: 1 addition & 1 deletion lib/crypto-tools/encrypt-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var DataCipherKeyIv = require('../crypto-tools/cipher-key-iv');
* Represents a duplex stream capable of taking cleartext data as input and
* producing output encrypted with {@link DataCipherKeyIv}
* @constructor
* @license AGPL-3.0
* @license LGPL-3.0
* @param {DataCipherKeyIv} keyiv - Object to use for derivation function
* @emits EncryptStream#data
* @emits EncryptStream#end
Expand Down
Loading

0 comments on commit 1a19c43

Please sign in to comment.