Skip to content

Commit

Permalink
created dataclasses for new packets
Browse files Browse the repository at this point in the history
  • Loading branch information
TiaSinghania committed Dec 19, 2024
1 parent ed35e5a commit 5efb025
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 34 deletions.
132 changes: 105 additions & 27 deletions rb_ws/src/buggy/buggy/serial/host_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,88 @@ def update(self, b):

MAX_PAYLOAD_LEN = 100

MSG_TYPE_DEBUG = b'DB'
MSG_TYPE_ODOMETRY = b'OD'

# firmware --> software
MSG_TYPE_NAND_DEBUG = b'ND'
MSG_TYPE_NAND_UKF = b'NU'
MSG_TYPE_NAND_GPS = b'NG'
MSG_TYPE_RADIO = b'RA'
MSG_TYPE_SC_DEBUG = b'SD'
MSG_TYPE_SC_SENSORS = b'SS'
MSG_TYPE_ROUNDTRIP_TIMESTAMP = b'RT'

# software --> firmware
MSG_TYPE_STEERING = b'ST'
MSG_TYPE_BRAKE = b'BR'
MSG_TYPE_ALARM = b'AL'
MSG_TYPE_BNYATEL = b'BT'
MSG_TYPE_SOFTWARE_TIMESTAMP = b'TM'



@dataclass
class NANDDebug:
timestamp: int
rc_steering_angle: float
stepper_steering_angle: float
accelerometer: float
encoder_angle: float
operator_ready: bool
brake_status: bool
auton_steer: bool
rfm69_timeout: bool
tx12_state: bool
stepper_alarm: bool
rc_uplink_quality: int

@dataclass
class NANDUKF:
timestamp: int
easting: float
northing: float
theta: float
omega: float
velocity: float

@dataclass
class Odometry:
x: float
y: float
radio_seqnum: int
class NANDGPS:
gps_seqnum: int
easting: float
northing: float
gps_fix: int
# this is a 2D accuracy value
accuracy: int
timestamp: int

@dataclass
class Radio:
gps_seqnum: int
nand_x_gps: float
nand_y_gps: float
nand_gps_fix: float

@dataclass
class SCDebug:
timestamp: int
rc_steering_angle: float
stepper_steering_angle: float
# encoder_angle: float
operator_ready: bool
brake_status: bool
auton_steer: bool
tx12_state: bool
stepper_alarm: bool
rc_uplink_quality: int


@dataclass
class SCSensors:
steering: float
# velocity: float
timestamp: int

@dataclass
class BnyaTelemetry:
x: float
y: float
velocity: float # In METERS / SECOND
steering: float # In DEGREES
heading: float # In RADIANS
heading_rate: float # In RADIANS / SECOND
class RoundtripTimestamp:
returned_time: int


class IncompletePacket(Exception):
pass
Expand Down Expand Up @@ -115,9 +175,11 @@ def send_steering(self, angle: float):
def send_alarm(self, status: int):
self.send_packet_raw(MSG_TYPE_ALARM, struct.pack('<B', status))

def send_timestamp(self, time: float):
self.send_packet_raw(MSG_TYPE_SOFTWARE_TIMESTAMP, struct.pack('<d', time))

def read_packet_raw(self):
self.rx_buffer += self.port.read_all() #type:ignore

try:
return self._try_parse_buffer()
except IncompletePacket:
Expand Down Expand Up @@ -184,21 +246,37 @@ def read_packet(self):
return None

msg_type, payload = packet
if msg_type == MSG_TYPE_ODOMETRY:
# Odometry message
if msg_type == MSG_TYPE_NAND_DEBUG:
x, y, radio_seqnum, gps_seqnum = struct.unpack('<Idddd??????B', payload)
return NANDDebug(x, y, radio_seqnum, gps_seqnum)

elif msg_type == MSG_TYPE_NAND_UKF:
x, y, radio_seqnum, gps_seqnum = struct.unpack('<ddII', payload)
return Odometry(x, y, radio_seqnum, gps_seqnum)
elif msg_type == MSG_TYPE_DEBUG:
# Debug message
debug = struct.unpack('<fff??B?BBxx', payload)
# print(debug)
return debug
elif msg_type == MSG_TYPE_BNYATEL:
return NANDUKF(x, y, radio_seqnum, gps_seqnum)

elif msg_type == MSG_TYPE_NAND_GPS:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return NANDGPS(x, y, vel, steering, heading, heading_rate)

elif msg_type == MSG_TYPE_RADIO:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return BnyaTelemetry(x, y, vel, steering, heading, heading_rate)
return Radio(x, y, vel, steering, heading, heading_rate)

elif msg_type == MSG_TYPE_SC_DEBUG:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return SCDebug(x, y, vel, steering, heading, heading_rate)

elif msg_type == MSG_TYPE_SC_SENSORS:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return SCSensors(x, y, vel, steering, heading, heading_rate)

elif msg_type == MSG_TYPE_ROUNDTRIP_TIMESTAMP:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return RoundtripTimestamp(x, y, vel, steering, heading, heading_rate)
else:
print(f'Unknown packet type {msg_type}')
return None
# print(f'Unknown packet type {msg_type}')



def main():
Expand Down
7 changes: 0 additions & 7 deletions rb_ws/src/buggy/buggy/serial/ros_to_bnyahaj.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ class Translator(Node):
Translates the output from bnyahaj serial (interpreted from host_comm) to ros topics and vice versa.
Performs reading (from Bnya Serial) and writing (from Ros Topics) on different python threads, so
be careful of multithreading synchronizaiton issues.
SC:
(ROS) Self Steering topic --> (Bnyahaj) Stepper Motor
(Bnyahaj) NAND Odom --> (ROS) NAND Odom topic
NAND:
(ROS) Self Steering --> (Bnyahaj) Stepper Motor
(Bnyahaj) Self Odom from UKF --> (ROS) NAND Odom topic
"""

def __init__(self, self_name, other_name, teensy_name):
Expand Down

0 comments on commit 5efb025

Please sign in to comment.