Skip to content

Commit

Permalink
Include to use Server-Sent Events (SSE) instead of WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuscardosodeveloper committed May 8, 2024
1 parent 93283b9 commit 7cba22c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 42 deletions.
1 change: 0 additions & 1 deletion src/tagoio_sdk/infrastructure/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def request() -> ResultHandlerResponse:
data=dataBody,
params=requestParams.get("params"),
timeout=config.tagoSDKconfig["requestTimeout"],
stream=True,
)
)

Expand Down
38 changes: 18 additions & 20 deletions src/tagoio_sdk/infrastructure/api_sse.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,42 @@
from typing import Union
import requests
from typing import Union, Literal
from urllib.parse import urlencode, urljoin
from sseclient import SSEClient
from tagoio_sdk.common.tagoio_module import GenericModuleParams
from tagoio_sdk.regions import getConnectionURI

channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"]
channelsWithoutID = ["notification", "analysis_trigger", "ui"]
channels = channelsWithID + channelsWithoutID

class OpenSSEWithID(GenericModuleParams):
def __init__(self, channel: str, resource_id: str):
super().__init__()
self.channel = channel
self.resource_id = resource_id

channel: Literal["device_inspector", "analysis_console", "ui_dashboard"]
resources_id: str

class OpenSSEWithoutID(GenericModuleParams):
def __init__(self, channel: str):
super().__init__()
self.channel = channel
channel: Literal["notification", "analysis_trigger", "ui"]


OpenSSEConfig = Union[OpenSSEWithID, OpenSSEWithoutID]

channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"]
channelsWithoutID = ["notification", "analysis_trigger", "ui"]
channels = channelsWithID + channelsWithoutID


def isChannelWithID(params: OpenSSEConfig) -> bool:
return params.channel in channelsWithID
return params.get("channel") in channelsWithID


def openSSEListening(params: OpenSSEConfig) -> SSEClient:
base_url = getConnectionURI(params.region)["sse"]
base_url = getConnectionURI(params.get("region"))["sse"]
url = urljoin(base_url, "/events")

query_params = {"token": params.token}
query_params = {}
if isChannelWithID(params):
query_params["channel"] = f"{params.channel}.{params.resource_id}"
query_params["channel"] = f"{params.get('channel')}.{params.get('resources_id')}"
else:
query_params["channel"] = params.channel
query_params["channel"] = params.get("channel")

query_params["token"] = params.get("token")

url += "?" + urlencode(query_params)

return SSEClient(url)
response = requests.get(url, stream=True, headers={"Accept": "text/event-stream"})

return SSEClient(response)
24 changes: 12 additions & 12 deletions src/tagoio_sdk/modules/Analysis/Analysis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import os
from typing import Callable, Optional
from typing import Any, Callable, Optional

from tagoio_sdk.common.tagoio_module import TagoIOModule
from tagoio_sdk.infrastructure.api_sse import openSSEListening
from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisEnvironment
from tagoio_sdk.modules.Services import Services

T_ANALYSIS_CONTEXT = os.environ.get("T_ANALYSIS_CONTEXT") or None
Expand Down Expand Up @@ -37,8 +38,8 @@ def context():

self._analysis(context, data)

# TODO: Fix any
def __runLocal(self, environment: any, data: any, analysis_id: any, token: any):
def __runLocal(self, environment: list[AnalysisEnvironment], data: list[Any], analysis_id: str, token: str):
""" Run Analysis @internal"""
def log(*args: any):
print(*args)
Services.Services({"token": token}).console.log(str(args)[1:][:-2])
Expand All @@ -60,7 +61,7 @@ def __localRuntime(self):
print("¬ Error :: Analysis not found or not active.")
return

if analysis.get("run_on") == "external":
if analysis.get("run_on") != "external":
print("¬ Warning :: Analysis is not set to run on external")

tokenEnd = self.token[-5:]
Expand All @@ -80,18 +81,17 @@ def __localRuntime(self):

for event in sse.events():
try:
data = json.loads(event.data)
data = json.loads(event.data).get("payload")

if not data:
continue

if data["analysis_id"] == analysis["id"]:
self.__runLocal(
data["environment"],
data["data"],
data["analysis_id"],
self.token,
)
self.__runLocal(
data["environment"],
data["data"],
data["analysis_id"],
self.token,
)
except RuntimeError:
print("¬ Connection was closed, trying to reconnect...")
pass
Expand Down
22 changes: 13 additions & 9 deletions src/tagoio_sdk/regions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from contextlib import suppress
from typing import Literal, TypedDict
from typing import Literal, Optional, TypedDict


class RegionDefinition(TypedDict):
Expand All @@ -12,15 +12,19 @@ class RegionDefinition(TypedDict):
# noRegionWarning = False

regionsDefinition = {
"usa-1": {"api": "https://api.tago.io", "realtime": "wss://realtime.tago.io"},
"usa-1": {
"api": "https://api.tago.io",
"realtime": "wss://realtime.tago.io",
"sse": "https://sse.tago.io/events"
},
"env": None, # ? process object should be on trycatch.
"sse": "http://localhost:8080/events"

}

Regions = Literal["usa-1", "env"]


def getConnectionURI(region: Regions) -> RegionDefinition:
def getConnectionURI(region: Optional[Regions]) -> RegionDefinition:
value = None
with suppress(KeyError):
value = regionsDefinition[region]
Expand All @@ -29,14 +33,14 @@ def getConnectionURI(region: Regions) -> RegionDefinition:
return value

if region is not None and region != "env":
raise Exception("> TagoIO-SDK: Invalid region {}.".format(region))
raise Exception(f"> TagoIO-SDK: Invalid region {region}.")

try:
api = os.environ.get("TAGOIO_API") or ""
realtime = os.environ.get("TAGOIO_REALTIME") or ""
sse = os.environ.get("TAGOIO_SSE") or ""
api = os.environ.get("TAGOIO_API")
realtime = os.environ.get("TAGOIO_REALTIME")
sse = os.environ.get("TAGOIO_SSE")

if api == "" and region != "env":
if not api and region != "env":
raise Exception("Invalid Env")

return {"api": api, "realtime": realtime, "sse": sse}
Expand Down

0 comments on commit 7cba22c

Please sign in to comment.