Skip to content

Commit

Permalink
Add gets to websockets (#57)
Browse files Browse the repository at this point in the history
* Add gets to websockets

* 7.0.1
  • Loading branch information
markwylde authored Jun 2, 2021
1 parent 17cf67c commit 9b3e032
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 84 deletions.
67 changes: 67 additions & 0 deletions lib/actions/http/getAll.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const writeResponse = require('write-response');

const orderByFields = require('../../../utils/orderByFields');

const {
COMMAND,
GET,
STATUS,
DATA,
COLLECTION_ID,
QUERY,
DOCUMENTS,
FIELDS,
ORDER,
LIMIT
} = require('../../constants');

function askOnAllNodes (state, data) {
return Promise.all(
state.nodes.map(node => node.connection.send(data))
);
}

async function handleGetAll (state, request, response, { collectionId, url }) {
const limit = url.searchParams.get('limit') && JSON.parse(url.searchParams.get('limit'));
const orders = url.searchParams.get('order') && JSON.parse(url.searchParams.get('order'));

const responses = await askOnAllNodes(state, {
[COMMAND]: GET,
[DATA]: {
[COLLECTION_ID]: collectionId,
[QUERY]: url.searchParams.get('query') && JSON.parse(url.searchParams.get('query')),
[FIELDS]: url.searchParams.get('fields') && JSON.parse(url.searchParams.get('fields')),
[ORDER]: orders || undefined,
[LIMIT]: limit || undefined
}
});

if (responses.find(response => response[STATUS] >= 500)) {
writeResponse(500, responses[0][DATA], response);
return;
}

if (!responses.find(response => response[STATUS] === 200)) {
writeResponse(200, [], response);
return;
}

let results = responses
.map(response => response[DOCUMENTS])
.flat()
.filter(item => !!item);

if (limit) {
results = results.slice(0, limit);
}

if (orders) {
orders.forEach(order => {
orderByFields(results, order);
});
}

writeResponse(200, results, response);
}

module.exports = handleGetAll;
56 changes: 56 additions & 0 deletions lib/actions/http/getOne.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const writeResponse = require('write-response');

const {
COMMAND,
GET,
STATUS,
DATA,
COLLECTION_ID,
RESOURCE_ID,
QUERY,
LIMIT,
DOCUMENTS,
FIELDS
} = require('../../constants');

function askOnAllNodes (state, data) {
return Promise.all(
state.nodes.map(node => node.connection.send(data))
);
}

async function handleGetOne (state, request, response, { collectionId, resourceId, url }) {
const responses = await askOnAllNodes(state, {
[COMMAND]: GET,
[DATA]: {
[COLLECTION_ID]: collectionId,
[QUERY]: url.searchParams.get('query') && JSON.parse(url.searchParams.get('query')),
[RESOURCE_ID]: resourceId,
[LIMIT]: 1,
[FIELDS]: url.searchParams.get('fields') && JSON.parse(url.searchParams.get('fields'))
}
});

if (responses.find(response => response[STATUS] >= 500)) {
writeResponse(500, responses[0][DATA], response);
return;
}

if (!responses.find(response => response[STATUS] === 200)) {
writeResponse(404, {}, response);
return;
}

const results = responses
.map(response => response[DOCUMENTS] || [])
.flat();

if (results.length === 0) {
writeResponse(404, {}, response);
return;
}

writeResponse(200, results[0], response);
}

module.exports = handleGetOne;
53 changes: 53 additions & 0 deletions lib/actions/ws/getAll.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const {
COMMAND,
DATA,
GET,
STATUS,
DOCUMENTS,
LIMIT,
ORDER
} = require('../../constants');

const orderByFields = require('../../../utils/orderByFields');

function askOnAllNodes (state, data) {
return Promise.all(
state.nodes.map(node => node.connection.send(data))
);
}

async function handleGetAll (acceptId, state, data, socket) {
const responses = await askOnAllNodes(state, {
[COMMAND]: GET,
[DATA]: data
});

if (responses.find(response => response[STATUS] >= 500)) {
socket.send(JSON.stringify(['A', acceptId, responses]));
return;
}

if (!responses.find(response => response[STATUS] === 200)) {
socket.send(JSON.stringify(['A', acceptId, []]));
return;
}

let results = responses
.map(response => response[DOCUMENTS])
.flat()
.filter(item => !!item);

if (data[LIMIT]) {
results = results.slice(0, data[LIMIT]);
}

if (data[ORDER]) {
data[ORDER].forEach(order => {
orderByFields(results, order);
});
}

socket.send(JSON.stringify(['A', acceptId, results]));
}

module.exports = handleGetAll;
85 changes: 3 additions & 82 deletions lib/httpHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ const uuid = require('uuid').v4;
const packageJson = require('../package.json');

const validateAlphaNumericDashDot = require('../utils/validateAlphaNumericDashDot');
const orderByFields = require('../utils/orderByFields');

const handlePost = require('./actions/http/post');
const handleGetAll = require('./actions/http/getAll');
const handleGetOne = require('./actions/http/getOne');

function handleInvalidRequestBody (error) {
if (error.message.includes('Unexpected token')) {
Expand All @@ -22,20 +23,15 @@ const {
STATUS,
DATA,
DOCUMENT,
DOCUMENTS,
LOCK,
UNLOCK,
COUNT,
GET,
PUT,
PATCH,
DELETE,
COLLECTION_ID,
RESOURCE_ID,
QUERY,
FIELDS,
LIMIT,
ORDER
QUERY
} = require('./constants');

function askOnAllNodes (state, data) {
Expand All @@ -55,81 +51,6 @@ function accumulateChanges (responses) {
}, 0);
}

async function handleGetOne (state, request, response, { collectionId, resourceId, url }) {
const responses = await askOnAllNodes(state, {
[COMMAND]: GET,
[DATA]: {
[COLLECTION_ID]: collectionId,
[RESOURCE_ID]: resourceId,
[FIELDS]: url.searchParams.get('fields') && JSON.parse(url.searchParams.get('fields'))
}
});

if (responses.find(response => response[STATUS] >= 500)) {
writeResponse(500, responses[0][DATA], response);
return;
}

if (!responses.find(response => response[STATUS] === 200)) {
writeResponse(404, {}, response);
return;
}

const results = responses
.map(response => response[DOCUMENTS] || [])
.flat();

if (results.length === 0) {
writeResponse(404, {}, response);
return;
}

writeResponse(200, results[0], response);
}

async function handleGetAll (state, request, response, { collectionId, url }) {
const limit = url.searchParams.get('limit') && JSON.parse(url.searchParams.get('limit'));
const orders = url.searchParams.get('order') && JSON.parse(url.searchParams.get('order'));

const responses = await askOnAllNodes(state, {
[COMMAND]: GET,
[DATA]: {
[COLLECTION_ID]: collectionId,
[QUERY]: url.searchParams.get('query') && JSON.parse(url.searchParams.get('query')),
[FIELDS]: url.searchParams.get('fields') && JSON.parse(url.searchParams.get('fields')),
[ORDER]: orders || undefined,
[LIMIT]: limit || undefined
}
});

if (responses.find(response => response[STATUS] >= 500)) {
writeResponse(500, responses[0][DATA], response);
return;
}

if (!responses.find(response => response[STATUS] === 200)) {
writeResponse(200, [], response);
return;
}

let results = responses
.map(response => response[DOCUMENTS])
.flat()
.filter(item => !!item);

if (limit) {
results = results.slice(0, limit);
}

if (orders) {
orders.forEach(order => {
orderByFields(results, order);
});
}

writeResponse(200, results, response);
}

async function handleCount (state, request, response, { collectionId, url }) {
const responses = await askOnAllNodes(state, {
[COMMAND]: COUNT,
Expand Down
7 changes: 7 additions & 0 deletions lib/wsHandler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const WebSocket = require('ws');

const handlePost = require('./actions/ws/post');
const handleGetAll = require('./actions/ws/getAll');

const {
COMMAND,
Expand Down Expand Up @@ -91,6 +92,12 @@ function wsHandler (server, state, options) {
return;
}

if (command === 'GET') {
handleGetAll(acceptId, state, data, socket);

return;
}

socket.send(JSON.stringify(['R', acceptId, 'COMMAND_NOT_FOUND']));
});

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "canhazdb-server",
"version": "7.0.0",
"version": "7.0.1",
"description": "A shaded and clustered database communicated over http rest.",
"main": "./lib/index.js",
"bin": {
Expand Down
43 changes: 43 additions & 0 deletions test/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const createTestCluster = require('./helpers/createTestCluster');
const {
STATUS,
DOCUMENT,
QUERY,
COLLECTION_ID
} = require('../lib/constants');

Expand All @@ -19,6 +20,48 @@ const tls = {
requestCert: true
};

test('get: getAll some data', async t => {
t.plan(4);

const cluster = await createTestCluster(3, tls);
const node = cluster.getRandomNodeUrl();

const insertResponses = await Promise.all([
httpRequest(`${node.url}/tests`, {
method: 'POST',
data: { a: 1 }
}),

httpRequest(`${node.url}/tests`, {
method: 'POST',
data: { a: 2 }
})
]);

const ws = new WebSocket(node.wsUrl, tls);
ws.on('open', function open () {
ws.send(JSON.stringify([1, 'GET', {
[COLLECTION_ID]: 'tests',
[QUERY]: {
a: 1
}
}]));
});

ws.on('message', async function incoming (rawData) {
const [type, acceptId, data] = JSON.parse(rawData);

console.log(rawData);

t.equal(type, 'A', 'should have correct type');
t.equal(acceptId, 1, 'should have correct acceptId');
t.equal(data[0].id, insertResponses[0].data.id, 'had correct document id');
t.equal(data[0].a, 1, 'should return document field');

cluster.closeAll();
});
});

test('post: post some data', async t => {
t.plan(6);

Expand Down

0 comments on commit 9b3e032

Please sign in to comment.