Skip to content

Commit

Permalink
Adapted Python to V2
Browse files Browse the repository at this point in the history
  • Loading branch information
gabryelreyes committed Nov 21, 2023
1 parent 288c245 commit f92bea9
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions python/SerialMuxProt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class SerialMuxProtConstants:
CONTROL_CHANNEL_NUMBER = 0 # Number of Control Channel.
CONTROL_CHANNEL_PAYLOAD_LENGTH = CHANNEL_NAME_MAX_LEN + \
CONTROL_CHANNEL_PAYLOAD_DATA_LENGTH + \
CONTROL_CHANNEL_CMD_BYTE_LENGTH # DLC of Heartbeat Command.
CONTROL_CHANNEL_CMD_BYTE_LENGTH + 1 # DLC of Heartbeat Command.
# Index of the Command Byte of the Control Channel
CONTROL_CHANNEL_COMMAND_INDEX = 0
# Index of the start of the payload of the Control Channel
Expand All @@ -77,6 +77,7 @@ class Commands():
SCRB = 2
SCRB_RSP = 3


@dataclass
class Channel():
""" Channel Definition """
Expand Down Expand Up @@ -114,17 +115,19 @@ def pack_frame(self):
self.raw[2] = self.checksum
self.raw[3:] = self.payload


@dataclass
class ChannelArrays:
""" Container Class for Channel Arrays and their counters """

def __init__(self, max_configured_channels: int) -> None:
self.number_of_rx_channels = 0
self.number_of_tx_channels = 0
self.number_of_rx_channels = 0
self.number_of_tx_channels = 0
self.number_of_pending_channels = 0
self.rx_channels = [Channel() for x in range(max_configured_channels)]
self.tx_channels = [Channel() for x in range(max_configured_channels)]
self.pending_channels = [Channel() for x in range(max_configured_channels)]
self.rx_channels = [Channel() for x in range(max_configured_channels)]
self.tx_channels = [Channel() for x in range(max_configured_channels)]
self.pending_channels = [Channel()
for x in range(max_configured_channels)]


@dataclass
Expand All @@ -136,6 +139,7 @@ def __init__(self) -> None:
self.last_sync_response = 0
self.last_sync_command = 0


@dataclass
class RxData:
""" Container Dataclass for Receive Data and counters. """
Expand All @@ -145,6 +149,7 @@ def __init__(self) -> None:
self.rx_attempts = 0
self.receive_frame = Frame()


class SerialMuxProt:
""" SerialMuxProt Server """

Expand Down Expand Up @@ -342,7 +347,8 @@ def __process_rx(self) -> None:

# DLC = 0 means that the channel does not exist.
if (0 != dlc) and (SerialMuxProtConstants.MAX_RX_ATTEMPTS >= self.__rx_data.rx_attempts):
remaining_payload_bytes = self.__rx_data.received_bytes - SerialMuxProtConstants.HEADER_LEN
remaining_payload_bytes = self.__rx_data.received_bytes - \
SerialMuxProtConstants.HEADER_LEN
expected_bytes = dlc - remaining_payload_bytes
self.__rx_data.rx_attempts += 1

Expand Down Expand Up @@ -393,7 +399,7 @@ def __process_subscriptions(self) -> None:
request = bytearray(
SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_LENGTH)
request[0] = SerialMuxProtConstants.Commands.SCRB
request[1:] = bytearray(pending_channel.name, 'ascii')
request[6:] = bytearray(pending_channel.name, 'ascii')

# Pad array if necessary
request = request.ljust(
Expand Down Expand Up @@ -521,7 +527,8 @@ def __cmd_sync(self, payload: bytearray) -> None:
Command Data of received frame
"""

response = bytearray(SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_LENGTH)
response = bytearray(
SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_LENGTH)
response[SerialMuxProtConstants.CONTROL_CHANNEL_COMMAND_INDEX] = SerialMuxProtConstants.Commands.SYNC_RSP
response[SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_INDEX:] = payload

Expand All @@ -543,6 +550,7 @@ def __cmd_sync_rsp(self, payload: bytearray) -> None:
if self.__sync_data.last_sync_command == rcvd_timestamp:
self.__sync_data.last_sync_response = self.__sync_data.last_sync_command
self.__sync_data.is_synced = True
print("Synced!")

# Process pending Subscriptions
self.__process_subscriptions()
Expand All @@ -558,11 +566,12 @@ def __cmd_scrb(self, payload: bytearray) -> None:
Command Data of received frame
"""

response = bytearray(SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_LENGTH)
response = bytearray(
SerialMuxProtConstants.CONTROL_CHANNEL_PAYLOAD_LENGTH)
response[SerialMuxProtConstants.CONTROL_CHANNEL_COMMAND_INDEX] = SerialMuxProtConstants.Commands.SCRB_RSP

# Parse name
channel_name = str(payload, "ascii").strip('\x00')
channel_name = str(payload[5:], "ascii").strip('\x00')
response[1] = self.get_tx_channel_number(channel_name)

# Name is always sent back.
Expand All @@ -586,8 +595,10 @@ def __cmd_scrb_rsp(self, payload: bytearray) -> None:
"""

# Parse payload
channel_number = payload[0]
channel_name = str(payload[1:], "ascii").strip('\x00')
channel_number = payload[4]
channel_name = str(payload[5:], "ascii").strip('\x00')
print(channel_number)
print(channel_name)

if (self.__max_configured_channels >= channel_number) and \
(0 < self.__channels.number_of_pending_channels):
Expand Down

0 comments on commit f92bea9

Please sign in to comment.