Skip to content

Commit

Permalink
Merge pull request #68 from hyperledger/direct-round-trips
Browse files Browse the repository at this point in the history
Direct round trips
  • Loading branch information
peterbroadhurst authored Aug 10, 2022
2 parents 0e7fb94 + aa8f412 commit f5e321d
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions src/routers/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

import { Request, Router } from 'express';
import { promises as fs } from 'fs';
import { createReadStream } from 'fs';
import https from 'https';
import path from 'path';
import { v4 as uuidV4 } from 'uuid';
import * as blobsHandler from '../handlers/blobs';
import * as eventsHandler from '../handlers/events';
import { queueEvent } from '../handlers/events';
import * as messagesHandler from '../handlers/messages';
import { ca, cert, certBundle, key, peerID } from '../lib/cert';
import { config, persistDestinations, persistPeers } from '../lib/config';
import { IStatus } from '../lib/interfaces';
import { IBlobReceivedEvent, IFile, IMessageDeliveredEvent, IMessageReceivedEvent, IStatus } from '../lib/interfaces';
import RequestError from '../lib/request-error';
import * as utils from '../lib/utils';

Expand Down Expand Up @@ -152,46 +154,38 @@ router.post('/messages', async (req, res, next) => {
}
let senderDestination: string | undefined = undefined;
if (typeof req.body.sender === 'string') {
const segments = req.body.sender.split('/');
if (segments[0] !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
let senderID: string;
({ peerID: senderID, destination: senderDestination } = extractRecipientAndDestination(req.body.sender));
if (senderID !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${senderDestination}`, 400);
}
if (segments.length > 1) {
if (!config.destinations?.includes(segments[1])) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${segments[1]}`, 400);
}
senderDestination = segments[1];
if (senderDestination !== undefined && !config.destinations?.includes(senderDestination)) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${senderDestination}`, 400);
}
}
let recipientID: string;
let recipientDestination: string | undefined = undefined;
if (typeof req.body.recipient === 'string') {
const segments = req.body.recipient.split('/');
recipientID = segments[0];
if (segments.length > 1) {
recipientDestination = segments[1];
}
({ peerID: recipientID, destination: recipientDestination } = extractRecipientAndDestination(req.body.recipient));
} else {
throw new RequestError('Missing recipient', 400);
}
let recipientEndpoint: string;
let requestId: string = req.body.requestId ?? uuidV4();
if (recipientID === peerID) {
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
if (recipientDestination !== undefined && !config.destinations?.includes(recipientDestination)) {
throw new RequestError(`Unknown recipient destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400);
}
dispatchInternalMessage(req.body.sender, req.body.recipient, req.body.message, requestId);
} else {
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
if (recipientPeer === undefined) {
throw new RequestError(`Unknown recipient ${recipientID}`, 400);
}
recipientEndpoint = recipientPeer.endpoint;
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400);
}
messagesHandler.sendMessage(req.body.message, recipientID, recipientPeer.endpoint, requestId, senderDestination, recipientDestination);
}
let requestId = uuidV4();
if (typeof req.body.requestId === 'string') {
requestId = req.body.requestId;
}
messagesHandler.sendMessage(req.body.message, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
res.send({ requestId });
} catch (err) {
next(err);
Expand Down Expand Up @@ -256,31 +250,25 @@ router.post('/transfers', async (req, res, next) => {
await blobsHandler.retrieveMetadata(req.body.path);
let senderDestination: string | undefined = undefined;
if (typeof req.body.sender === 'string') {
const segments = req.body.sender.split('/');
if (segments[0] !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
let senderID: string;
({ peerID: senderID, destination: senderDestination } = extractRecipientAndDestination(req.body.sender));
if (senderID !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${senderID}`, 400);
}
if (segments.length > 1) {
if (!config.destinations?.includes(segments[1])) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${segments[1]}`, 400);
}
senderDestination = segments[1];
if (senderDestination !== undefined && !config.destinations?.includes(senderDestination)) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${senderDestination}`, 400);
}
}
let recipientID: string;
let recipientDestination: string | undefined = undefined;
if (typeof req.body.recipient === 'string') {
const segments = req.body.recipient.split('/');
recipientID = segments[0];
if (segments.length > 1) {
recipientDestination = segments[1];
}
({ peerID: recipientID, destination: recipientDestination } = extractRecipientAndDestination(req.body.recipient));
} else {
throw new RequestError('Missing recipient', 400);
}
let recipientEndpoint: string;
let requestId: string = req.body.requestId ?? uuidV4();
if (recipientID === peerID) {
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
dispatchInternalBlob(req.body.sender, req.body.recipient, req.body.path);
} else {
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
if (recipientPeer === undefined) {
Expand All @@ -289,15 +277,58 @@ router.post('/transfers', async (req, res, next) => {
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|')} recieved=${recipientDestination}`, 400);
}
recipientEndpoint = recipientPeer.endpoint;
}
let requestId = uuidV4();
if (typeof req.body.requestId === 'string') {
requestId = req.body.requestId;
blobsHandler.sendBlob(req.body.path, recipientID, recipientPeer.endpoint, requestId, senderDestination, recipientDestination);
}
blobsHandler.sendBlob(req.body.path, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
res.send({ requestId });
} catch (err) {
next(err);
}
});

const extractRecipientAndDestination = (value: string) => {
const segments = value.split('/');
return {
peerID: segments[0],
destination: segments.length > 1 ? segments[1] : undefined
};
};

const dispatchInternalMessage = async (sender: string, recipient: string, message: string, requestId: string) => {
await queueEvent({
id: uuidV4(),
type: 'message-received',
sender,
recipient,
message
} as IMessageReceivedEvent);
await queueEvent({
id: uuidV4(),
type: 'message-delivered',
sender,
recipient,
message,
requestId
} as IMessageDeliveredEvent);
};

const dispatchInternalBlob = async (sender: string, recipient: string, filePath: string) => {
const originBlobPath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
const readableStream = createReadStream(originBlobPath);
const file: IFile = {
key: '',
name: '',
readableStream
};
const destinationBlobPath = path.join(utils.constants.RECEIVED_BLOBS_SUBDIRECTORY, sender, filePath);
const metadata = await blobsHandler.storeBlob(file, destinationBlobPath);
await queueEvent({
id: uuidV4(),
type: 'blob-received',
sender,
recipient,
path: destinationBlobPath,
hash: metadata.hash,
size: metadata.size,
lastUpdate: metadata.lastUpdate
} as IBlobReceivedEvent);
};

0 comments on commit f5e321d

Please sign in to comment.