Skip to content

Commit

Permalink
Zetta Version (1.6.0) - Upgraded spdy - Working with Node v18 (#379)
Browse files Browse the repository at this point in the history
* First at upgrading spdy@3.x and ws@3.x

* Spdy/h2 ping working using forked version of `spdy`.

* Add extra upgrade header data back to socket for spdy to parse

* Fix virtual device  subscription causing socket hang up

* Update Travis to remove 0.10, add 6,8,10

* Changed custom spdy install to github:

* Updates to fix streams closing the connection.

* Fix isssue with reconnects not passing proper connectionId to peer client.
Fix issue with stream deduping when finding intenral remote port in the spdy internals.

* Fix failing test where an embedded ws is expected to return 404.

Updates to the WS cause it to return a 400 instead of a 404 when
the base path does not match.

* PeerSocket fix issue where a closing the peer conneciton causes subscribe's cb
to fire a second time. Wrap cb in a once pattern.

PeerSocket guard against this.ws not being initialized when closing the connection

* Add TODO to skipped test that tests spdy errors on the agent

* Minor cleanup and comments

* Upgrade dependancies for colors, uuid

* Upgraded deps

* HTTP should return a JSON error message when a device does not exist.

* Cleanup domain in test to ensure it does not catch unrelated errors.

* Add --exit to mocha to be compatible with old style in 1.x

Our tests are scheduling async operations and calling done. We
need to clean this up but with the old version of mocha it was
exiting the test suite after all tests were done. This enforces
the old behavior.

* Handle spdy Connection errors in the PeerSocket

* Drop travis support for 0.12

* Remove use of ES6 default parameter.

* Drop support for node v4 in .travis.

* Add osx to travis

* Peer z2z transport always uses spdy/3.1 to support old zetta hubs.

This ensures zetta hubs running pre spdy-upgrade will be compatible
at a network level. To support `h2` we need to negotiate transport
and ALPN cannot help us.

* Upgrade zetta-cluster to fix security vulnerabilities.

* Upgrade spdy to 4.x

* Publish: 1.6.0

---------

Co-authored-by: Adam Magaluk <AdamMagaluk@google.com>
  • Loading branch information
AdamMagaluk and Adam Magaluk authored Aug 15, 2024
1 parent 1f32c10 commit f34dca1
Show file tree
Hide file tree
Showing 18 changed files with 3,038 additions and 143 deletions.
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
language: node_js

os:
- linux
- osx

node_js:
- "0.12"
- "4"
- "6"
- "8"
- "10"

sudo: false
21 changes: 17 additions & 4 deletions lib/api_resources/servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ var streams = require('zetta-streams');
var ObjectStream = streams.ObjectStream;
var ActionError = require('zetta-device').ActionError;

// Returns the JSON error for when a device does not exist.
const DeviceDoesNotExistError = function(env, deviceNotFound) {
return {
class: ['error'],
properties: {
message: `Device ${deviceNotFound} does not exist.`
},
links: [
{ rel: ['self'], href: env.helpers.url.current() }
]
};
};

var ServerResource = module.exports = function(server) {
this.server = server;
this.httpScout = this.server.httpScout;
Expand Down Expand Up @@ -316,7 +329,7 @@ ServerResource.prototype.destroyDevice = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down Expand Up @@ -358,7 +371,7 @@ ServerResource.prototype.showDevice = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand All @@ -383,7 +396,7 @@ ServerResource.prototype.updateDevice = function(env, next) {
var device = this.server.runtime._jsDevices[env.route.params.deviceId];

if (!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down Expand Up @@ -444,7 +457,7 @@ ServerResource.prototype.deviceAction = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down
84 changes: 52 additions & 32 deletions lib/http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

// external http(s) server
var httpOptions = {
windowSize: 1024 * 1024
connection: {
windowSize: 1024 * 1024,
autoSpdy31: false
},
spdy: {
plain: true,
ssl: false
}
};

var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
var usingSSL = false;
Object.keys(options).forEach(function(k) {
Expand All @@ -66,16 +74,29 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {
});

// If any tls options were specified, use ssl and not plain
httpOptions.plain = (usingSSL) ? false : true;
httpOptions.ssl = (usingSSL) ? true : false;
this.server = spdy.createServer(httpOptions);
httpOptions.spdy.plain = (usingSSL) ? false : true;
httpOptions.spdy.ssl = (usingSSL) ? true : false;

var spdyServerOpts = {
connection: {
windowSize: 1024 * 1024,
autoSpdy31: false
},
spdy: {
plain: true,
ssl: false
}
};

// Outside http server
this.server = spdy.createServer(httpOptions);

// internal server for z2z, allways ssl: false, plain: true
this.spdyServer = spdy.createServer({
windowSize: 1024 * 1024,
plain: true,
ssl: false
});
// TODO: remove this as it is unneeded now.
this.spdyServer = spdy.createServer(spdyServerOpts);
this.spdyServer.on('ping', function(socket) {
socket.emit('spdyPing');
})

var ValidWSUrls = [
/^\/events$/, // /events
Expand All @@ -93,7 +114,6 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

this.wss = new WebSocketServer({ noServer: true });
this.server.on('upgrade', function(request, socket, headers) {

var sendError = function(code) {
// Check any custom websocket paths from extentions
var finish = function() {
Expand Down Expand Up @@ -125,7 +145,7 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

// Handle Peer Request
self.wss.handleUpgrade(request, socket, headers, function(ws) {
self.setupPeerSocket(ws);
self.setupPeerSocket(ws, request);
});
});
} else if (match(request)) {
Expand All @@ -137,15 +157,15 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {
}

self.wss.handleUpgrade(request, socket, headers, function(ws) {
if (ws.upgradeReq.url === '/peer-management') {
if (request.url === '/peer-management') {
var query = [
{ name: self.zetta.id, topic: '_peer/connect' },
{ name: self.zetta.id, topic: '_peer/disconnect' }];

var client = new EventSocket(ws, query);
self.eventBroker.client(client);
} else {
self.setupEventSocket(ws);
self.setupEventSocket(ws, request);
}
});
});
Expand Down Expand Up @@ -255,15 +275,15 @@ function getCurrentProtocol(req) {
return protocol;
}

ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, request, host, p) {
ws._env = { helpers: {}};
ws._loader = { path: p };

ws._env.uri = function() {
var protocol = getCurrentProtocol(ws.upgradeReq);
var protocol = getCurrentProtocol(request);

if (!host) {
var address = ws.upgradeReq.connection.address();
var address = request.connection.address();
host = address.address;
if (address.port) {
if (!(protocol === 'https' && address.port === 443) &&
Expand All @@ -272,7 +292,7 @@ ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
}
}
}
return (protocol + '://' + path.join(host, ws.upgradeReq.url)).replace(/\\/g, '/');
return (protocol + '://' + path.join(host, request.url)).replace(/\\/g, '/');
};

ws._env.helpers.url = {};
Expand All @@ -284,24 +304,24 @@ ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
};
};

ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
ZettaHttpServer.prototype.setupPeerSocket = function(ws, request) {
var self = this;
var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
var name = /^\/peers\/(.+)$/.exec(url.parse(request.url, true).pathname)[1];
name = decodeURI(name);
self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');

// Include ._env and ._loader on websocket, allows argo formatters to work used in virtual_device build actions.
var host = ws.upgradeReq.headers['host']
self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
var host = request.headers['host']
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + name);

if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
// peer already connected or connecting
ws.close(4000, 'peer already connected');
} else if (self.peers[name]) {
// peer has been disconnected but has connected before.
self.peers[name].init(ws);
self.peers[name].init(ws, request);
} else {
var peer = new PeerSocket(ws, name, self.peerRegistry, self.peerOptions);
var peer = new PeerSocket(ws, request, name, self.peerRegistry, self.peerOptions);
self.peers[name] = peer;

// Events coming from the peers pubsub using push streams
Expand All @@ -327,13 +347,13 @@ ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
}
};

ZettaHttpServer.prototype.setupEventSocket = function(ws) {
ZettaHttpServer.prototype.setupEventSocket = function(ws, request) {
var self = this;
var host = ws.upgradeReq.headers['host'];
var host = request.headers['host'];

if (/^\/events/.exec(ws.upgradeReq.url)) {
self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
var parsed = url.parse(ws.upgradeReq.url, true);
if (/^\/events/.exec(request.url)) {
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + self.zetta._name);
var parsed = url.parse(request.url, true);
var query = parsed.query;

if(!query.topic) {
Expand Down Expand Up @@ -386,18 +406,18 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) {

self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
} else {
var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
var match = /^\/servers\/(.+)\/events/.exec(request.url);
if(!match) {
ws.close(1001); // go away status code
return;
}

var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
var query = querystring.parse(url.parse(request.url).query);
query.serverId = match[1]; // set serverId on query

self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + query.serverId);

var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
var query = querystring.parse(url.parse(request.url).query);
query.name = decodeURI(match[1]);

if (query.topic) {
Expand Down
11 changes: 3 additions & 8 deletions lib/peer_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ var spdy = require('spdy');
var Logger = require('./logger');
var WebSocket = require('./web_socket');

// monkey patch spdy connection to get access to ping event
var originalPingHandler = spdy.Connection.prototype._handlePing;
spdy.Connection.prototype._handlePing = function() {
this.socket.emit('spdyPing', this);
originalPingHandler.apply(this, arguments);
};

function calculatePeerUrl(url, name){
var wsUrl = url.replace(/^http/, 'ws');
var peerPath = '/peers/' + name;
Expand Down Expand Up @@ -130,10 +123,12 @@ PeerClient.prototype._createSocket = function() {
self.checkServerReq();
self.emit('connecting');
self.server.emit('connection', socket);
socket.on('spdyPing', function(connection) {

socket.on('spdyPing', function() {
// reset ping timer on a spdy ping from the peer
self._resetPingTimeout();
});

self.log.emit('log', 'peer-client', 'WebSocket to peer established (' + self.url + ')');
});

Expand Down
Loading

0 comments on commit f34dca1

Please sign in to comment.