Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/SSE/device-inspector/analysis-console/APPCLI-3 #12

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"commander": "^11.1.0",
"dotenv": "^16.3.1",
"envfile": "^6.18.0",
"eventsource": "^2.0.2",
"kleur": "^4.1.5",
"lodash": "^4.17.21",
"luxon": "^3.4.3",
Expand All @@ -57,6 +58,7 @@
},
"devDependencies": {
"@types/async": "^3.2.23",
"@types/eventsource": "^1.1.15",
"@types/jest": "29.5.8",
"@types/lodash": "^4.14.201",
"@types/luxon": "^3.3.4",
Expand Down
69 changes: 29 additions & 40 deletions src/commands/analysis/analysis-console.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
import { connect } from "socket.io-client";

import EventSource from "eventsource";
import { Account } from "@tago-io/sdk";
import { AnalysisInfo } from "@tago-io/sdk/lib/types";

import { getEnvironmentConfig, IEnvironment } from "../../lib/config-file";
import { errorHandler, highlightMSG, infoMSG, successMSG } from "../../lib/messages";
import { searchName } from "../../lib/search-name";
import { pickAnalysisFromConfig } from "../../prompt/pick-analysis-from-config";

/**
* Creates a WebSocket connection to the TagoIO Realtime API.
* @param profileToken The user's profile token.
* @returns The WebSocket instance.
* Creates a new SSE connection to the TagoIO Realtime API.
* @param profileToken - The user's profile token.
* @param analysisID - The ID of the analysis script to connect to.
* @returns An EventSource instance connected to the TagoIO Realtime API.
*/
function apiSocket(profileToken: string) {
const socket = connect("wss://realtime.tago.io", {
reconnectionDelay: 10_000,
reconnection: true,
transports: ["websocket"],
query: {
token: profileToken,
},
});
function apiSSE(profileToken: string, analysisID: string) {
const sse = new EventSource(`https://sse.tago.io/events?channel=analysis_console.${analysisID}&token=${profileToken}`);

return socket;
return sse;
}

/**
Expand All @@ -43,34 +37,29 @@ async function getScriptObj(scriptName: string | void, analysisList: IEnvironmen
}
return scriptObj;
}

/**
* Sets up a socket connection to TagoIO and attaches to an analysis script.
* @param socket - The socket connection to TagoIO.
* @param scriptId - The ID of the analysis script to attach to.
* @param analysis_info - Information about the analysis script.
* Sets up the SSE connection and event listeners for device live inspection.
* @param sse - The SSE connection to TagoIO.
* @param deviceIdOrToken - The ID or token of the device to inspect.
* @param deviceInfo - Information about the device being inspected.
*/
function setupSocket(socket: ReturnType<typeof apiSocket>, scriptId: string, analysis_info: any) {
socket.on("connect", () => {
infoMSG("Connected to TagoIO, Getting analysis information...");
socket.emit("attach", "analysis", scriptId);
socket.emit("attach", {
resourceName: "analysis",
resourceID: scriptId,
});
});

socket.on("disconnect", () => console.info("Disconnected from TagoIO.\n\n"));
function setupSSE(sse: ReturnType<typeof apiSSE>, scriptId: string, analysis_info: AnalysisInfo) {
sse.onmessage = (event) => {
const scope = JSON.parse(event.data).payload;
console.log(`\x1b[35m${new Date(scope.timestamp).toISOString()} \x1b[0m ${scope.message}`);
};

socket.on("error", (e: Error) => {
sse.onerror = (error) => {
errorHandler("Connection error");
console.error(e);
});

socket.on("ready", () => successMSG(`Analysis [${highlightMSG(analysis_info.name)}] found succesfully. ${highlightMSG("Waiting for logs...")}`));
console.error(error);
};

socket.on("analysis::console", (scope: any) => {
console.log(`\x1b[35m${new Date(scope.timestamp).toISOString()} \x1b[0m ${scope.message}`);
});
sse.onopen = () => {
infoMSG("Connected to TagoIO, Getting analysis information...");
successMSG(`Analysis [${highlightMSG(analysis_info.name)}] found successfully.`);
successMSG(`Waiting for logs...`);
};
}

/**
Expand Down Expand Up @@ -99,8 +88,8 @@ async function connectAnalysisConsole(scriptName: string | void, options: { envi
return;
}

const socket = apiSocket(config.profileToken);
setupSocket(socket, scriptObj.id, analysis_info);
const sse = apiSSE(config.profileToken, analysis_info.id);
setupSSE(sse, scriptObj.id, analysis_info);
}

export { connectAnalysisConsole };
77 changes: 31 additions & 46 deletions src/commands/devices/device-live-inspector.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,29 @@
import { io } from "socket.io-client";

import EventSource from "eventsource";
import { Account, Device } from "@tago-io/sdk";
import { DeviceInfo } from "@tago-io/sdk/lib/types";

import { getEnvironmentConfig } from "../../lib/config-file";
import { errorHandler, highlightMSG, infoMSG, successMSG } from "../../lib/messages";
import { errorHandler, highlightMSG, successMSG } from "../../lib/messages";
import { pickDeviceIDFromTagoIO } from "../../prompt/pick-device-id-from-tagoio";

/**
* Creates a new socket connection to the TagoIO Realtime API.
* Creates a new SSE connection to the TagoIO Realtime API.
* @param profileToken - The user's profile token.
* @returns A socket instance connected to the TagoIO Realtime API.
* @param deviceID - The ID of the device to inspect.
* @returns An EventSource instance connected to the TagoIO Realtime API.
*/
function apiSocket(profileToken: string) {
const socket = io("wss://realtime.tago.io", {
reconnectionDelay: 10_000,
reconnection: true,
transports: ["websocket"],
query: {
token: profileToken,
},
});

return socket;
function apiSSE(profileToken: string, deviceID: string) {
const sse = new EventSource(`https://sse.tago.io/events?channel=device_inspector.${deviceID}&token=${profileToken}`);

return sse;
}

interface ScopeContent {
connection_id: string;
content: string;
device_id: string;
timestamp: string;
title: string;
content: string;
}

/**
Expand All @@ -47,43 +42,33 @@ function displayMessage(scope: ScopeContent) {
}

/**
* Sets up the socket connection and event listeners for device live inspection.
* @param socket - The socket connection to TagoIO.
* Sets up the SSE connection and event listeners for device live inspection.
* @param sse - The SSE connection to TagoIO.
* @param deviceIdOrToken - The ID or token of the device to inspect.
* @param deviceInfo - Information about the device being inspected.
*/
function setupSocket(socket: ReturnType<typeof apiSocket>, deviceIdOrToken: string, deviceInfo: DeviceInfo) {
socket.on("connect", () => {
successMSG("Connected to TagoIO, Getting device information...");
socket.emit("attach", "device", deviceIdOrToken);
socket.emit("attach", {
resourceName: "device",
resourceID: deviceIdOrToken,
});
});

socket.on("disconnect", () => infoMSG("Disconnected from TagoIO.\n\n"));

socket.on("error", (e: Error) => errorHandler(`Connection error: ${e}`));

socket.on("ready", () => {
const deviceName = deviceInfo?.name || deviceIdOrToken;
successMSG(`Device [${highlightMSG(deviceName)}] found successfully.`);
successMSG(`Waiting for logs...`);
});

/**
* Event listener for device inspection messages.
*/
socket.on("device::inspection", (scope: ScopeContent | ScopeContent[]) => {
function setupSSE(sse: ReturnType<typeof apiSSE>, deviceIdOrToken: string, deviceInfo: DeviceInfo) {
sse.onmessage = (event) => {
const scope = JSON.parse(event.data).payload as ScopeContent;
if (Array.isArray(scope)) {
for (const item of scope) {
displayMessage(item);
}
} else {
displayMessage(scope);
}
});
};

sse.onerror = (error) => {
errorHandler("Connection error");
console.error(error);
};

sse.onopen = () => {
const deviceName = deviceInfo?.name || deviceIdOrToken;
successMSG(`Device [${highlightMSG(deviceName)}] found successfully.`);
successMSG(`Waiting for logs...`);
};
}

interface IOptions {
Expand Down Expand Up @@ -125,8 +110,8 @@ async function inspectorConnection(deviceIdOrToken: string, options: IOptions) {
deviceIdOrToken = deviceInfo.id;
}

const socket = apiSocket(config.profileToken);
setupSocket(socket, deviceIdOrToken, deviceInfo);
const sse = apiSSE(config.profileToken, deviceInfo.id);
setupSSE(sse, deviceIdOrToken, deviceInfo);
}

export { inspectorConnection };
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"compilerOptions": {
"target": "ES2021",
"module": "CommonJS",
"lib": ["ES2022"],
"lib": ["ES2022", "DOM"],
"allowJs": true,
"outDir": "./build",
"noImplicitAny": true,
Expand Down
Loading