From 052dde043e5c06b42343fa6ee6aef16fc3f5d1d1 Mon Sep 17 00:00:00 2001 From: Eric Andrechek Date: Wed, 10 Apr 2024 17:36:31 -0400 Subject: [PATCH] updating mqtt from db --- tracking-dashboard/backend/api/data.py | 18 +++--- tracking-dashboard/backend/mqtt-syncer.py | 2 +- tracking-dashboard/backend/sql/helpers.py | 32 ++++++++++ tracking-dashboard/backend/tasks/mqttadder.py | 24 ++++++++ tracking-dashboard/backend/utils/api.py | 60 ++++++++++++------- 5 files changed, 106 insertions(+), 30 deletions(-) create mode 100644 tracking-dashboard/backend/tasks/mqttadder.py diff --git a/tracking-dashboard/backend/api/data.py b/tracking-dashboard/backend/api/data.py index faf825e..84946ac 100644 --- a/tracking-dashboard/backend/api/data.py +++ b/tracking-dashboard/backend/api/data.py @@ -1,3 +1,4 @@ +import stat from flask import Blueprint, request import json @@ -69,19 +70,22 @@ def api_rock7_upload(): # if timestamp is not in data, add as transmit_time if 'timestamp' not in decoded_data: decoded_data['timestamp'] = data['transmit_time'] - # if type is not in data, add as json - if 'type' not in decoded_data: - decoded_data['type'] = 'json' # if 'timestamp' is None or empty, add as transmit_time if decoded_data['timestamp'] is None or decoded_data['timestamp'] == "": decoded_data['timestamp'] = data['transmit_time'] # set data to decoded_data data = decoded_data + source = {} + source['callsign'] = data['ROCK'] + source['ssid'] = 7 + source['timestamp'] = data['transmit_time'] + source['type'] = 'json' + source['data'] = data # accept upload data try: - data_obj.upload(data) + data_obj.upload(source) except Exception as e: return str(e), 400 @@ -100,7 +104,7 @@ def api_rock7_upload(): # save data try: status_code = data_obj.save() - if status_code == 201: + if status_code == 201 or status_code == 202: return "New data saved", 200 elif status_code == 208: return "Data already exists", 200 @@ -143,8 +147,8 @@ def api_upload(): # save data try: status_code = data_obj.save() - if status_code == 201: - return "New data saved", 201 + if status_code == 201 or status_code == 202: + return "New data saved", status_code elif status_code == 208: return "Data already exists", 208 except Exception as e: diff --git a/tracking-dashboard/backend/mqtt-syncer.py b/tracking-dashboard/backend/mqtt-syncer.py index 33abbad..ace04ae 100644 --- a/tracking-dashboard/backend/mqtt-syncer.py +++ b/tracking-dashboard/backend/mqtt-syncer.py @@ -175,7 +175,7 @@ def on_message(client, userdata, message): return try: status_code = data_obj.save() - if status_code == 201: + if status_code == 201 or status_code == 202: print("New data saved") elif status_code == 208: print("Data already exists") diff --git a/tracking-dashboard/backend/sql/helpers.py b/tracking-dashboard/backend/sql/helpers.py index 4ee6d9a..c6376d5 100644 --- a/tracking-dashboard/backend/sql/helpers.py +++ b/tracking-dashboard/backend/sql/helpers.py @@ -6,9 +6,40 @@ sys.path.append("..") from sql.db import Session from sql.models import * +from re import L from sqlalchemy.exc import IntegrityError from psycopg2.errors import UniqueViolation from sqlalchemy import cast, func +from tasks.mqttadder import add_data + +# add new datum to mqtt (position or telemetry) +def add_datum(name, data): + # add to redis queue for adding to mqtt by worker + if isinstance(data, Position): + latitude = None + longitude = None + # convert geometry to lat/lon + if data.geo is not None: + latitude = data.geo.y + longitude = data.geo.x + mqtt_data = { + "name": name, + "lat": latitude, + "lon": longitude, + "alt": data.altitude, + "cse": data.course, + "spd": data.speed, + "cmnt": data.comment, + "sym": data.symbol + } + add_data("position", mqtt_data) + elif isinstance(data, Telemetry): + mqtt_data = { + "name": name, + "telemetry": data.parsed + } + add_data("telemetry", mqtt_data) + # takes a message object and adds it to the database # returns message id of message object and whether or not it was added @@ -42,6 +73,7 @@ def add_position(position): Session.add(position) try: Session.commit() + add_datum(position) return (True, position.id) except IntegrityError as e: print(e) diff --git a/tracking-dashboard/backend/tasks/mqttadder.py b/tracking-dashboard/backend/tasks/mqttadder.py new file mode 100644 index 0000000..4cfbac1 --- /dev/null +++ b/tracking-dashboard/backend/tasks/mqttadder.py @@ -0,0 +1,24 @@ +from redis import Redis +from rq import Queue +import paho.mqtt.client as mqtt +from paho.mqtt.packettypes import PacketTypes +from paho.mqtt.properties import Properties +import json + +def add_data_task(data_type, data): + client = mqtt.Client() + client.connect("localhost", 1883, 60) + properties=Properties(PacketTypes.PUBLISH) + properties.MessageExpiryInterval=60 # retain messages for x seconds + + if data_type == "position": + client.publish("POSITION/" + data["name"], json.dumps(data), retain=True, qos=0, properties=properties) + elif data_type == "telemetry": + client.publish("TELEMETRY/" + data["name"], json.dumps(data), retain=True, qos=0, properties=properties) + + client.disconnect() + + +def add_data(data_type, data): + q = Queue(connection=Redis()) + q.enqueue(add_data, data_type, data) diff --git a/tracking-dashboard/backend/utils/api.py b/tracking-dashboard/backend/utils/api.py index f0ec261..e7ad532 100644 --- a/tracking-dashboard/backend/utils/api.py +++ b/tracking-dashboard/backend/utils/api.py @@ -161,6 +161,8 @@ def parse_data(self): self.data["course"] = course self.data["speed"] = speed self.data["comment"] = comment + if "telemetry" in self.raw["data"]: + self.data["telemetry"] = self.raw["data"]["telemetry"] self.parse() @@ -209,30 +211,44 @@ def save(self, save_timestamp=False): ) telemetry_success, telemetry_id = add_telemetry(telemetry) + # add to mqtt + name = self.data["callsign"] + "-" + str(self.data["ssid"]) + add_datum(name, telemetry) + print(telemetry_success, telemetry_id) - # build Geometry POINT object for lat/lon - # format: 'POINT(-33.9034 152.73457)' - geo_point = f"POINT({self.data['lat']} {self.data['lon']})" if "lat" in self.data and self.data["lat"] is not None and "lon" in self.data and self.data["lon"] is not None else None - - # add to position table - position = Position( - callsign=self.data["callsign"], - ssid=self.data["ssid"], - symbol=self.data["symbol"], - geo=geo_point, - altitude=self.data["alt"], - course=self.data["course"], - speed=self.data["speed"], - comment=self.data["comment"], - telemetry=telemetry_id if telemetry_success else None, - message=message_id, - ) - position_success, position_id = add_position(position) - - print(position_success, position_id) - - to_return = 201 + # if position data exists, add to positions table + if 'lat' in self.data and 'lon' in self.data: + + # build Geometry POINT object for lat/lon + # format: 'POINT(-33.9034 152.73457)' + geo_point = f"POINT({self.data['lat']} {self.data['lon']})" if "lat" in self.data and self.data["lat"] is not None and "lon" in self.data and self.data["lon"] is not None else None + + # add to position table + position = Position( + callsign=self.data["callsign"], + ssid=self.data["ssid"], + symbol=self.data["symbol"], + geo=geo_point, + altitude=self.data["alt"], + course=self.data["course"], + speed=self.data["speed"], + comment=self.data["comment"], + telemetry=telemetry_id if telemetry_success else None, + message=message_id, + ) + position_success, position_id = add_position(position) + + # add to mqtt + name = self.data["callsign"] + "-" + str(self.data["ssid"]) + add_datum(name, position) + + print(position_success, position_id) + + to_return = 201 + else: + # only a telemetry packet + to_return = 202 else: to_return = 208