Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #60 from gordonwritescode/master
Browse files Browse the repository at this point in the history
Bug Bash (v0.6.2)
  • Loading branch information
gordonwritescode committed Apr 27, 2016
2 parents 40d1571 + 95ecccc commit 74233f7
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 80 deletions.
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ module.exports = {
AUDIT_BYTES: 32,
/** @constant {Number} CLEAN_INTERVAL - Interval for reaping stale shards */
CLEAN_INTERVAL: 3600000,
/** @constant {Number} TUNNEL_ANNOUNCE_INTERVAL - Announce tunnel state */
TUNNEL_ANNOUNCE_INTERVAL: 900000,
/** @constant {Number} OPCODE_TUNRPC_PREFIX - Opcode for tunnel rpc message */
OPCODE_TUNRPC_PREFIX: 0x0c,
/** @constant {Number} OPCODE_TUNDCX_PREFIX - Opcode for tunnel datachannel */
Expand Down
13 changes: 5 additions & 8 deletions lib/datachannel/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ DataChannelServer.prototype._handleConnection = function(socket) {

socket.on('error', function(err) {
self._log.error('data channel connection error: %s', err.message);
socket.send(JSON.stringify({ code: 500, message: err.message }));
socket.close(500, err.message);
});

socket.on('message', function(data) {
Expand All @@ -112,24 +112,21 @@ DataChannelServer.prototype._handleConnection = function(socket) {
try {
data = JSON.parse(data);
} catch (err) {
return socket.send(JSON.stringify({
code: 400,
message: 'Failed to parse message'
})).terminate();
return socket.close(400, 'Failed to parse message');
}

token = data.token;

try {
self._authorize(token, data.hash);
} catch (err) {
return socket.send(JSON.stringify({
code: 400, message: err.message
})).terminate();
return socket.close(401, err.message);
}

self._allowed[token].client = socket;

socket.removeAllListeners('message');

switch (data.operation) {
case 'PUSH':
return self._handleConsignStream(socket, token);
Expand Down
77 changes: 50 additions & 27 deletions lib/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ Network.prototype.join = function(callback) {
next();
}, function() {
self._listenForTunnelers();
self._setupTunnelClient();
callback(null, self);

if (self._transport._portMapped) {
return callback(null, self);
}

self._setupTunnelClient(callback);
});
};

Expand Down Expand Up @@ -238,6 +242,10 @@ Network.prototype._signMessage = function(message, callback) {
* @param {Function} callback
*/
Network.prototype._verifyMessage = function(message, contact, callback) {
if (!utils.isValidContact(contact, !!process.env.STORJ_ALLOW_LOOPBACK)) {
return; // NB: Just drop the message altogether
}

if (!utils.isCompatibleVersion(contact.protocol)) {
return callback(new Error('Protocol version is incompatible'));
}
Expand All @@ -253,10 +261,6 @@ Network.prototype._verifyMessage = function(message, contact, callback) {
var addr = bitcore.Address.fromPublicKeyHash(Buffer(contact.nodeID, 'hex'));
var signobj = this._createSignatureObject(signature);

if (!signobj) {
return callback(new Error('Invalid signature supplied'));
}

this._verifySignature({
message: message,
nonce: nonce,
Expand All @@ -272,6 +276,10 @@ Network.prototype._verifyMessage = function(message, contact, callback) {
* @private
*/
Network.prototype._verifySignature = function(options, callback) {
if (!options.signobj) {
return callback(new Error('Invalid signature supplied'));
}

var signedmsg = Message(options.message.id + options.nonce);
var ecdsa = new bitcore.crypto.ECDSA();

Expand Down Expand Up @@ -320,15 +328,24 @@ Network.prototype._handleTransportError = function(error) {
*/
Network.prototype._listenForTunnelers = function() {
var self = this;
var tunserver = self._transport._tunserver;
var prefix = Buffer([constants.OPCODE_TUNNELER_PREFIX], 'hex');
var available = Buffer([constants.OPCODE_DEG_LOW], 'hex');
var unavailable = Buffer([constants.OPCODE_DEG_NULL], 'hex');

if (this._options.tunnels) {
function announce() {
self._pubsub.publish(
Buffer.concat([prefix, available]).toString('hex'),
Buffer.concat([
prefix,
tunserver.hasTunnelAvailable() ? available : unavailable
]).toString('hex'),
self._contact
);
setTimeout(announce, constants.TUNNEL_ANNOUNCE_INTERVAL);
}

if (this._options.tunnels) {
announce();
}

this._transport._tunserver.on('locked', function() {
Expand Down Expand Up @@ -365,26 +382,28 @@ Network.prototype._listenForTunnelers = function() {
/**
* Determines if tunnel is needed
* @private
* @param {Function} callback
*/
Network.prototype._setupTunnelClient = function() {
Network.prototype._setupTunnelClient = function(callback) {
var self = this;
var neighbor = this._options.seeds.length ?
this._createContact(this._options.seeds[0]) :
null;

if (!neighbor) {
return this._logger.error('could not find a neighbor to query for probe');
return callback(new Error('could not find a neighbor to query for probe'));
}

this._logger.info('requesting probe from nearest neighbor');
this._requestProbe(neighbor, function(err, result) {
if (err || result.error) {
return self._findTunnel(neighbor);
return self._findTunnel(neighbor, callback);
}

self._logger.info(
'you are publicly reachable, skipping tunnel establishment'
);
callback(null);
});
};

Expand All @@ -404,36 +423,39 @@ Network.prototype._requestProbe = function(neighbor, callback) {
/**
* Finds a potential tunneler
* @private
* @param {Function} callback
*/
Network.prototype._findTunnel = function(neighbor) {
Network.prototype._findTunnel = function(neighbor, callback) {
var self = this;
var message = new kad.Message({
method: 'FIND_TUNNEL',
params: { contact: this._contact }
});

if (!neighbor) {
return this._logger.error('could not find a neighbor to query for tunnels');
return callback(
new Error('could not find a neighbor to query for tunnels')
);
}

this._logger.info('requesting tunnelers from nearest neighbor');
this._transport.send(neighbor, message, function(err, resp) {
if (err) {
return self._logger.error(
'failed to find tunnels, reason: %s',
err.message
return callback(
new Error('failed to find tunnels, reason: ' + err.message)
);
}

self._establishTunnel(resp.result.tunnels);
self._establishTunnel(resp.result.tunnels, callback);
});
};

/**
* Creates a tunnel to a public node
* @private
* @param {Function} callback
*/
Network.prototype._establishTunnel = function(tunnels) {
Network.prototype._establishTunnel = function(tunnels, callback) {
var self = this;
var tunnel = null;
var alias = null;
Expand All @@ -442,9 +464,9 @@ Network.prototype._establishTunnel = function(tunnels) {
return tunnel && alias;
}

function openTunnel(callback) {
function openTunnel(done) {
if (!tunnels.length) {
callback(new Error('No tunnelers were returned'));
done(new Error('No tunnelers were returned'));
}

var tun = new Contact(tunnels[0]);
Expand All @@ -456,21 +478,20 @@ Network.prototype._establishTunnel = function(tunnels) {
tunnels.unshift();
self._transport.send(tun, msg, function(err, resp) {
if (err) {
return callback();
return done();
}

tunnel = resp.result.tunnel;
alias = resp.result.alias;

callback();
done();
});
}

async.until(established, openTunnel, function(err) {
if (err) {
return self._logger.error(
'failed to establish tunnel, reason: %s',
err.message
return callback(
new Error('failed to establish tunnel, reason: ' + err.message)
);
}

Expand All @@ -481,6 +502,8 @@ Network.prototype._establishTunnel = function(tunnels) {
self._logger.info('tunnel successfully established: %j', alias);
self._contact.address = alias.address;
self._contact.port = alias.port;

callback();
});

tunclient.on('close', function onTunnelClosed(code, message) {
Expand All @@ -489,15 +512,15 @@ Network.prototype._establishTunnel = function(tunnels) {
code,
message
);
self._establishTunnel(tunnels);
self._establishTunnel(tunnels, callback);
});

tunclient.on('error', function onTunnelError(err) {
self._logger.warn(
'tunnel connection lost, reason: %s',
err.message
);
self._establishTunnel(tunnels);
self._establishTunnel(tunnels, callback);
});

tunclient.open();
Expand Down
49 changes: 31 additions & 18 deletions lib/network/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var Contract = require('../contract');
var StorageItem = require('../storage/item');
var stream = require('stream');
var kad = require('kad');
var async = require('async');

/**
* Defines the Storj protocol methods and mounts on a {@link Network} instance
Expand Down Expand Up @@ -267,35 +268,47 @@ Protocol.prototype._handleFindTunnel = function(params, callback) {
});
}

this._askSeedForTunnels(callback);
this._askNeighborsForTunnels(callback);
};

/**
* Sends a FIND_TUNNEL to our seed on behalf of requester
* @private
*/
Protocol.prototype._askSeedForTunnels = function(callback) {
if (!this._network._options.seeds[0]) {
this._logger.info('there are no known tunnels to provide');
return callback(new Error('No known tunnels to provide'));
}
Protocol.prototype._askNeighborsForTunnels = function(callback) {
var self = this;
var nearestNeighbors = this._network._router.getNearestContacts(
this._network._contact.nodeID,
3,
this._network._contact.nodeID
);

this._logger.info('asking original seed for known tunnels');
this._logger.info('asking nearest neighbors for known tunnels');

this._network._transport.send(
this._network._createContact(this._network._options.seeds[0]),
kad.Message({
function askNeighbor(neighbor, done) {
self._network._transport.send(neighbor, kad.Message({
method: 'FIND_TUNNEL',
params: { contact: this._network._contact }
}),
function(err, response) {
if (err) {
return callback(err);
params: { contact: self._network._contact }
}), function(err, response) {
if (err || !Array.isArray(response.result.tunnels)) {
return done();
}

callback(null, { tunnels: response.result.tunnels });
}
);
response.result.tunnels.forEach(function(tun) {
if (self._network._tunnelers.getSize() < kad.constants.K) {
self._network._tunnelers.addContact(
self._network._transport._createContact(tun)
);
}

done();
});
});
}

async.each(nearestNeighbors, askNeighbor, function() {
callback(null, { tunnels: self._network._tunnelers.getContactList() });
});
};

/**
Expand Down
11 changes: 11 additions & 0 deletions lib/network/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var kad = require('kad');
var natupnp = require('nat-upnp').createClient();
var ip = require('ip');
var TunnelServer = require('../tunnel/server');
var merge = require('merge');

/**
* Custom HTTP transport adapter
Expand All @@ -25,6 +26,8 @@ function Transport(contact, options) {
return new Transport(contact, options);
}

options = merge(Object.create(Transport.DEFAULTS), options);

this._maxTunnels = options.tunnels;
this._tunport = options.tunport || 0;
this._noforward = options.noforward;
Expand All @@ -34,6 +37,13 @@ function Transport(contact, options) {
this._bindTunnelServer();
}

Transport.DEFAULTS = {
maxTunnels: 3,
tunport: 0,
noforward: false,
gateways: { min: 0, max: 0 }
};

inherits(Transport, kad.transports.HTTP);

/**
Expand All @@ -53,6 +63,7 @@ Transport.prototype._open = function(callback) {
'you are not bound to a public address, trying traversal strategies...'
);
self._forwardPort(function(err, ip) {
self._portMapped = !err;
kad.transports.HTTP.prototype._open.call(self, callback);
self._contact.address = ip || self._contact.address;
});
Expand Down
3 changes: 3 additions & 0 deletions lib/tunnel/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ TunnelClient.prototype.close = function() {

var states = [WebSocketClient.CONNECTING, WebSocketClient.OPEN];

this._muxer.removeAllListeners();
this._demuxer.removeAllListeners();

if (states.indexOf(this._tunnel.readyState) !== -1) {
this._tunnel.close();
}
Expand Down
Loading

0 comments on commit 74233f7

Please sign in to comment.