diff --git a/src/tagoio_sdk/infrastructure/api_request.py b/src/tagoio_sdk/infrastructure/api_request.py index bd501b1..196bc95 100644 --- a/src/tagoio_sdk/infrastructure/api_request.py +++ b/src/tagoio_sdk/infrastructure/api_request.py @@ -84,7 +84,6 @@ def request() -> ResultHandlerResponse: data=dataBody, params=requestParams.get("params"), timeout=config.tagoSDKconfig["requestTimeout"], - stream=True, ) ) diff --git a/src/tagoio_sdk/infrastructure/api_sse.py b/src/tagoio_sdk/infrastructure/api_sse.py index fe3cb6a..e9c41c0 100644 --- a/src/tagoio_sdk/infrastructure/api_sse.py +++ b/src/tagoio_sdk/infrastructure/api_sse.py @@ -1,44 +1,42 @@ -from typing import Union +import requests +from typing import Union, Literal from urllib.parse import urlencode, urljoin from sseclient import SSEClient from tagoio_sdk.common.tagoio_module import GenericModuleParams from tagoio_sdk.regions import getConnectionURI +channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"] +channelsWithoutID = ["notification", "analysis_trigger", "ui"] +channels = channelsWithID + channelsWithoutID class OpenSSEWithID(GenericModuleParams): - def __init__(self, channel: str, resource_id: str): - super().__init__() - self.channel = channel - self.resource_id = resource_id - + channel: Literal["device_inspector", "analysis_console", "ui_dashboard"] + resources_id: str class OpenSSEWithoutID(GenericModuleParams): - def __init__(self, channel: str): - super().__init__() - self.channel = channel + channel: Literal["notification", "analysis_trigger", "ui"] OpenSSEConfig = Union[OpenSSEWithID, OpenSSEWithoutID] -channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"] -channelsWithoutID = ["notification", "analysis_trigger", "ui"] -channels = channelsWithID + channelsWithoutID - - def isChannelWithID(params: OpenSSEConfig) -> bool: - return params.channel in channelsWithID + return params.get("channel") in channelsWithID def openSSEListening(params: OpenSSEConfig) -> SSEClient: - base_url = getConnectionURI(params.region)["sse"] + base_url = getConnectionURI(params.get("region"))["sse"] url = urljoin(base_url, "/events") - query_params = {"token": params.token} + query_params = {} if isChannelWithID(params): - query_params["channel"] = f"{params.channel}.{params.resource_id}" + query_params["channel"] = f"{params.get('channel')}.{params.get('resources_id')}" else: - query_params["channel"] = params.channel + query_params["channel"] = params.get("channel") + + query_params["token"] = params.get("token") url += "?" + urlencode(query_params) - return SSEClient(url) + response = requests.get(url, stream=True, headers={"Accept": "text/event-stream"}) + + return SSEClient(response) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index 5d2f549..136cef2 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -1,9 +1,10 @@ import json import os -from typing import Callable, Optional +from typing import Any, Callable, Optional from tagoio_sdk.common.tagoio_module import TagoIOModule from tagoio_sdk.infrastructure.api_sse import openSSEListening +from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisEnvironment from tagoio_sdk.modules.Services import Services T_ANALYSIS_CONTEXT = os.environ.get("T_ANALYSIS_CONTEXT") or None @@ -37,8 +38,8 @@ def context(): self._analysis(context, data) - # TODO: Fix any - def __runLocal(self, environment: any, data: any, analysis_id: any, token: any): + def __runLocal(self, environment: list[AnalysisEnvironment], data: list[Any], analysis_id: str, token: str): + """ Run Analysis @internal""" def log(*args: any): print(*args) Services.Services({"token": token}).console.log(str(args)[1:][:-2]) @@ -60,7 +61,7 @@ def __localRuntime(self): print("¬ Error :: Analysis not found or not active.") return - if analysis.get("run_on") == "external": + if analysis.get("run_on") != "external": print("¬ Warning :: Analysis is not set to run on external") tokenEnd = self.token[-5:] @@ -80,18 +81,17 @@ def __localRuntime(self): for event in sse.events(): try: - data = json.loads(event.data) + data = json.loads(event.data).get("payload") if not data: continue - if data["analysis_id"] == analysis["id"]: - self.__runLocal( - data["environment"], - data["data"], - data["analysis_id"], - self.token, - ) + self.__runLocal( + data["environment"], + data["data"], + data["analysis_id"], + self.token, + ) except RuntimeError: print("¬ Connection was closed, trying to reconnect...") pass diff --git a/src/tagoio_sdk/regions.py b/src/tagoio_sdk/regions.py index 4039fc8..f177d0d 100644 --- a/src/tagoio_sdk/regions.py +++ b/src/tagoio_sdk/regions.py @@ -1,6 +1,6 @@ import os from contextlib import suppress -from typing import Literal, TypedDict +from typing import Literal, Optional, TypedDict class RegionDefinition(TypedDict): @@ -12,15 +12,19 @@ class RegionDefinition(TypedDict): # noRegionWarning = False regionsDefinition = { - "usa-1": {"api": "https://api.tago.io", "realtime": "wss://realtime.tago.io"}, + "usa-1": { + "api": "https://api.tago.io", + "realtime": "wss://realtime.tago.io", + "sse": "https://sse.tago.io/events" + }, "env": None, # ? process object should be on trycatch. - "sse": "http://localhost:8080/events" + } Regions = Literal["usa-1", "env"] -def getConnectionURI(region: Regions) -> RegionDefinition: +def getConnectionURI(region: Optional[Regions]) -> RegionDefinition: value = None with suppress(KeyError): value = regionsDefinition[region] @@ -29,14 +33,14 @@ def getConnectionURI(region: Regions) -> RegionDefinition: return value if region is not None and region != "env": - raise Exception("> TagoIO-SDK: Invalid region {}.".format(region)) + raise Exception(f"> TagoIO-SDK: Invalid region {region}.") try: - api = os.environ.get("TAGOIO_API") or "" - realtime = os.environ.get("TAGOIO_REALTIME") or "" - sse = os.environ.get("TAGOIO_SSE") or "" + api = os.environ.get("TAGOIO_API") + realtime = os.environ.get("TAGOIO_REALTIME") + sse = os.environ.get("TAGOIO_SSE") - if api == "" and region != "env": + if not api and region != "env": raise Exception("Invalid Env") return {"api": api, "realtime": realtime, "sse": sse}