Skip to content

Commit

Permalink
Merge pull request #1329 from RunOnFlux/development
Browse files Browse the repository at this point in the history
v5.12.1
  • Loading branch information
TheTrunk authored Jun 12, 2024
2 parents d785e2a + 23902f9 commit 1b6ee17
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 34 deletions.
7 changes: 2 additions & 5 deletions ZelBack/src/services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -6588,13 +6588,11 @@ async function storeAppRunningMessage(message) {

const validTill = message.broadcastedAt + (65 * 60 * 1000); // 3900 seconds
if (validTill < Date.now()) {
log.warn(`Rejecting old/not valid Fluxapprunning message, message:${JSON.stringify(message)}`);
// reject old message
return false;
}

const randomDelay = Math.floor((Math.random() * 1280)) + 240;
await serviceHelper.delay(randomDelay);

const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);

Expand All @@ -6616,8 +6614,7 @@ async function storeAppRunningMessage(message) {
// eslint-disable-next-line no-await-in-loop
const result = await dbHelper.findOneInDatabase(database, globalAppsLocations, queryFind, projection);
if (result && result.broadcastedAt && result.broadcastedAt >= newAppRunningMessage.broadcastedAt) {
// found a message that was already stored/bad message
log.warn(`Old Fluxapprunning message, more recent available, appName:${newAppRunningMessage.name} ip: ${newAppRunningMessage.ip}`);
// found a message that was already stored/probably from duplicated message processsed
messageNotOk = true;
break;
}
Expand Down
19 changes: 14 additions & 5 deletions ZelBack/src/services/fluxCommunication.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,21 @@ function handleIncomingConnection(websocket, req) {
// and add him to blocklist
try {
// check if message comes from IP belonging to the public Key
const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient.
const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level
const nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port);
let zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient.
let nodeFound = zl.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port);
if (!nodeFound) {
log.warn(`Invalid message received from incoming peer ${peer.ip}:${peer.port} which is not an originating node of ${pubKey}.`);
ws.close(4004, 'invalid message, disconnect'); // close as of policy violation
// check if message comes from IP belonging to the public Key
zl = await fluxCommunicationUtils.deterministicFluxList(); // this itself is sufficient.
const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level
nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port);
if (!nodeFound) {
log.warn(`Invalid message received from incoming peer ${peer.ip}:${peer.port} which is not an originating node of ${pubKey}.`);
ws.close(4004, 'invalid message, disconnect'); // close as of policy violation
} else {
blockedPubKeysCache.set(pubKey, pubKey); // blocks ALL the nodes corresponding to the pubKey
log.warn(`closing incoming connection, adding peers ${pubKey}:${peer.port} to the blockedList. Originated from ${peer.ip}.`);
ws.close(4005, 'invalid message, blocked'); // close as of policy violation?
}
} else {
blockedPubKeysCache.set(pubKey, pubKey); // blocks ALL the nodes corresponding to the pubKey
log.warn(`closing incoming connection, adding peers ${pubKey}:${peer.port} to the blockedList. Originated from ${peer.ip}.`);
Expand Down
12 changes: 3 additions & 9 deletions ZelBack/src/services/fluxCommunicationMessagesSender.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,13 @@ async function respondWithAppMessage(msgObj, ws) {
// check if we have it database of permanent appMessages
// eslint-disable-next-line global-require
const appsService = require('./appsService');
const message = msgObj.data;
const appsMessages = [];
if (!message || typeof message !== 'object' || typeof message.type !== 'string' || typeof message.version !== 'number'
|| typeof message.broadcastedAt !== 'number') {
if (!msgObj.data) {
throw new Error('Invalid Flux App Request message');
}

const message = msgObj.data;

if (message.version !== 1 && message.version !== 2) {
throw new Error(`Invalid Flux App Request message, version ${message.version} not supported`);
}
Expand All @@ -336,12 +336,6 @@ async function respondWithAppMessage(msgObj, ws) {
}
}

const validTill = message.broadcastedAt + (65 * 60 * 1000); // 3900 seconds
if (validTill < Date.now()) {
// reject old message
return;
}

// eslint-disable-next-line no-restricted-syntax
for (const hash of appsMessages) {
if (myMessageCache.has(hash)) {
Expand Down
26 changes: 19 additions & 7 deletions ZelBack/src/services/fluxCommunicationUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,36 @@ async function verifyFluxBroadcast(data, obtainedFluxNodesList, currentTimeStamp
if (!node) {
// node that broadcasted the message has to be on list
// pubkey of the broadcast has to be on the list
const zl = await deterministicFluxList(pubKey);
let zl = await deterministicFluxList(pubKey);
if (dataObj.data && dataObj.data.type === 'fluxapprunning') {
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxapprunning message, ip: ${dataObj.data.ip} pubkey: ${pubKey} nodelistSize: ${zl.length}`); // most of invalids are caused because our deterministic list is cached for couple of minutes
return false;
zl = await deterministicFluxList();
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxapprunning message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`); // most of invalids are caused because our deterministic list is cached for couple of minutes
return false;
}
}
} else if (dataObj.data && dataObj.data.type === 'fluxipchanged') {
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxipchanged message, oldIP: ${dataObj.data.oldIP} pubkey: ${pubKey} nodelistSize: ${zl.length}`);
return false;
zl = await deterministicFluxList();
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxipchanged message, oldIP: ${dataObj.data.oldIP} pubkey: ${pubKey}`);
return false;
}
}
} else if (dataObj.data && dataObj.data.type === 'fluxappremoved') {
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxappremoved message, ip: ${dataObj.data.ip} pubkey: ${pubKey} nodelistSize: ${zl.length}`);
return false;
zl = await deterministicFluxList();
node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key
if (!node) {
log.warn(`Invalid fluxappremoved message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`);
return false;
}
}
} else {
node = zl.find((key) => key.pubkey === pubKey);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "flux",
"version": "5.12.0",
"version": "5.12.1",
"description": "Flux, Your Gateway to a Decentralized World",
"repository": {
"type": "git",
Expand Down
7 changes: 0 additions & 7 deletions tests/unit/fluxCommunicationMessagesSender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 'test1',
broadcastedAt: Date.now(),
version: 3,
},
};
Expand All @@ -528,7 +527,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 312313,
broadcastedAt: Date.now(),
version: 1,
},
};
Expand All @@ -547,7 +545,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hashes: 312313,
broadcastedAt: Date.now(),
version: 2,
},
};
Expand All @@ -566,7 +563,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 'test1',
broadcastedAt: Date.now(),
version: 1,
},
};
Expand All @@ -590,7 +586,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 'test1',
broadcastedAt: Date.now(),
version: 1,
},
};
Expand All @@ -616,7 +611,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 'test1',
broadcastedAt: Date.now(),
version: 1,
},
};
Expand Down Expand Up @@ -644,7 +638,6 @@ describe('fluxCommunicationMessagesSender tests', () => {
data: {
type: 'fluxapprequest',
hash: 'test1',
broadcastedAt: Date.now(),
version: 1,
},
};
Expand Down

0 comments on commit 1b6ee17

Please sign in to comment.