Skip to content

Commit

Permalink
updating mqtt from db
Browse files Browse the repository at this point in the history
  • Loading branch information
EricAndrechek committed Apr 10, 2024
1 parent 6eedc19 commit 052dde0
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 30 deletions.
18 changes: 11 additions & 7 deletions tracking-dashboard/backend/api/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import stat
from flask import Blueprint, request
import json

Expand Down Expand Up @@ -69,19 +70,22 @@ def api_rock7_upload():
# if timestamp is not in data, add as transmit_time
if 'timestamp' not in decoded_data:
decoded_data['timestamp'] = data['transmit_time']
# if type is not in data, add as json
if 'type' not in decoded_data:
decoded_data['type'] = 'json'
# if 'timestamp' is None or empty, add as transmit_time
if decoded_data['timestamp'] is None or decoded_data['timestamp'] == "":
decoded_data['timestamp'] = data['transmit_time']

# set data to decoded_data
data = decoded_data
source = {}
source['callsign'] = data['ROCK']
source['ssid'] = 7
source['timestamp'] = data['transmit_time']
source['type'] = 'json'
source['data'] = data

# accept upload data
try:
data_obj.upload(data)
data_obj.upload(source)
except Exception as e:
return str(e), 400

Expand All @@ -100,7 +104,7 @@ def api_rock7_upload():
# save data
try:
status_code = data_obj.save()
if status_code == 201:
if status_code == 201 or status_code == 202:
return "New data saved", 200
elif status_code == 208:
return "Data already exists", 200
Expand Down Expand Up @@ -143,8 +147,8 @@ def api_upload():
# save data
try:
status_code = data_obj.save()
if status_code == 201:
return "New data saved", 201
if status_code == 201 or status_code == 202:
return "New data saved", status_code
elif status_code == 208:
return "Data already exists", 208
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion tracking-dashboard/backend/mqtt-syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def on_message(client, userdata, message):
return
try:
status_code = data_obj.save()
if status_code == 201:
if status_code == 201 or status_code == 202:
print("New data saved")
elif status_code == 208:
print("Data already exists")
Expand Down
32 changes: 32 additions & 0 deletions tracking-dashboard/backend/sql/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,40 @@
sys.path.append("..")
from sql.db import Session
from sql.models import *
from re import L
from sqlalchemy.exc import IntegrityError
from psycopg2.errors import UniqueViolation
from sqlalchemy import cast, func
from tasks.mqttadder import add_data

# add new datum to mqtt (position or telemetry)
def add_datum(name, data):
# add to redis queue for adding to mqtt by worker
if isinstance(data, Position):
latitude = None
longitude = None
# convert geometry to lat/lon
if data.geo is not None:
latitude = data.geo.y
longitude = data.geo.x
mqtt_data = {
"name": name,
"lat": latitude,
"lon": longitude,
"alt": data.altitude,
"cse": data.course,
"spd": data.speed,
"cmnt": data.comment,
"sym": data.symbol
}
add_data("position", mqtt_data)
elif isinstance(data, Telemetry):
mqtt_data = {
"name": name,
"telemetry": data.parsed
}
add_data("telemetry", mqtt_data)


# takes a message object and adds it to the database
# returns message id of message object and whether or not it was added
Expand Down Expand Up @@ -42,6 +73,7 @@ def add_position(position):
Session.add(position)
try:
Session.commit()
add_datum(position)
return (True, position.id)
except IntegrityError as e:
print(e)
Expand Down
24 changes: 24 additions & 0 deletions tracking-dashboard/backend/tasks/mqttadder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from redis import Redis
from rq import Queue
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import json

def add_data_task(data_type, data):
client = mqtt.Client()
client.connect("localhost", 1883, 60)
properties=Properties(PacketTypes.PUBLISH)
properties.MessageExpiryInterval=60 # retain messages for x seconds

if data_type == "position":
client.publish("POSITION/" + data["name"], json.dumps(data), retain=True, qos=0, properties=properties)
elif data_type == "telemetry":
client.publish("TELEMETRY/" + data["name"], json.dumps(data), retain=True, qos=0, properties=properties)

client.disconnect()


def add_data(data_type, data):
q = Queue(connection=Redis())
q.enqueue(add_data, data_type, data)
60 changes: 38 additions & 22 deletions tracking-dashboard/backend/utils/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def parse_data(self):
self.data["course"] = course
self.data["speed"] = speed
self.data["comment"] = comment
if "telemetry" in self.raw["data"]:
self.data["telemetry"] = self.raw["data"]["telemetry"]

self.parse()

Expand Down Expand Up @@ -209,30 +211,44 @@ def save(self, save_timestamp=False):
)
telemetry_success, telemetry_id = add_telemetry(telemetry)

# add to mqtt
name = self.data["callsign"] + "-" + str(self.data["ssid"])
add_datum(name, telemetry)

print(telemetry_success, telemetry_id)

# build Geometry POINT object for lat/lon
# format: 'POINT(-33.9034 152.73457)'
geo_point = f"POINT({self.data['lat']} {self.data['lon']})" if "lat" in self.data and self.data["lat"] is not None and "lon" in self.data and self.data["lon"] is not None else None

# add to position table
position = Position(
callsign=self.data["callsign"],
ssid=self.data["ssid"],
symbol=self.data["symbol"],
geo=geo_point,
altitude=self.data["alt"],
course=self.data["course"],
speed=self.data["speed"],
comment=self.data["comment"],
telemetry=telemetry_id if telemetry_success else None,
message=message_id,
)
position_success, position_id = add_position(position)

print(position_success, position_id)

to_return = 201
# if position data exists, add to positions table
if 'lat' in self.data and 'lon' in self.data:

# build Geometry POINT object for lat/lon
# format: 'POINT(-33.9034 152.73457)'
geo_point = f"POINT({self.data['lat']} {self.data['lon']})" if "lat" in self.data and self.data["lat"] is not None and "lon" in self.data and self.data["lon"] is not None else None

# add to position table
position = Position(
callsign=self.data["callsign"],
ssid=self.data["ssid"],
symbol=self.data["symbol"],
geo=geo_point,
altitude=self.data["alt"],
course=self.data["course"],
speed=self.data["speed"],
comment=self.data["comment"],
telemetry=telemetry_id if telemetry_success else None,
message=message_id,
)
position_success, position_id = add_position(position)

# add to mqtt
name = self.data["callsign"] + "-" + str(self.data["ssid"])
add_datum(name, position)

print(position_success, position_id)

to_return = 201
else:
# only a telemetry packet
to_return = 202
else:
to_return = 208

Expand Down

0 comments on commit 052dde0

Please sign in to comment.