Skip to content

Commit

Permalink
Moved to attrs for messages and added protocol char. Fixes #23, #19
Browse files Browse the repository at this point in the history
  • Loading branch information
Krolken committed Aug 19, 2021
1 parent f09e86e commit 14fe7c4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 111 deletions.
172 changes: 62 additions & 110 deletions iec62056_21/messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import typing
from typing import *
import attr

from iec62056_21.exceptions import Iec6205621ParseError, ValidationError
from iec62056_21 import constants, utils
Expand Down Expand Up @@ -40,6 +41,7 @@ def from_bytes(cls, bytes_data):
return cls.from_representation(bytes_data.decode(constants.ENCODING))


@attr.s(auto_attribs=True)
class DataSet(Iec6205621Data):

"""
Expand All @@ -50,13 +52,9 @@ class DataSet(Iec6205621Data):

EXCLUDE_CHARS = ["(", ")", "/", "!"]

def __init__(self, value: str, address: str = None, unit: str = None):

# TODO: in programming mode, protocol mode C the value can be up to 128 chars

self.address = address
self.value = value
self.unit = unit
value: str
address: Optional[str] = attr.ib(default=None)
unit: Optional[str] = attr.ib(default=None)

def to_representation(self) -> str:
if self.unit is not None and self.address is not None:
Expand All @@ -70,7 +68,7 @@ def to_representation(self) -> str:
return f"({self.value})"

@classmethod
def from_representation(cls, data_set_string):
def from_representation(cls, data_set_string: str):
just_value = regex_data_just_value.search(data_set_string)

if just_value:
Expand All @@ -92,30 +90,21 @@ def from_representation(cls, data_set_string):
else:
return cls(address=address, value=values_data, unit=None)

def __repr__(self):
return (
f"{self.__class__.__name__}("
f"value={self.value!r}, "
f"address={self.address!r}, "
f"unit={self.unit!r}"
f")"
)


@attr.s(auto_attribs=True)
class DataLine(Iec6205621Data):
"""
A data line is a list of data sets.
"""

def __init__(self, data_sets):
self.data_sets: typing.List[DataSet] = data_sets
data_sets: List[DataSet]

def to_representation(self):
sets_representation = [_set.to_representation() for _set in self.data_sets]
return "".join(sets_representation)

@classmethod
def from_representation(cls, string_data):
def from_representation(cls, string_data: str):
"""
Is a list of data sets id(value*unit)id(value*unit)
need to split after each ")"
Expand All @@ -132,20 +121,17 @@ def from_representation(cls, string_data):

return cls(data_sets=data_sets)

def __repr__(self):
return f"{self.__class__.__name__}(" f"data_sets={self.data_sets!r}" f")"


@attr.s(auto_attribs=True)
class DataBlock(Iec6205621Data):
"""
A data block is a list of DataLines, each ended with a the line end characters
\n\r
"""

def __init__(self, data_lines):
self.data_lines = data_lines
data_lines: List[DataLine]

def to_representation(self):
def to_representation(self) -> str:
lines_rep = [
(line.to_representation() + constants.LINE_END) for line in self.data_lines
]
Expand All @@ -157,15 +143,12 @@ def from_representation(cls, string_data: str):
data_lines = [DataLine.from_representation(line) for line in lines]
return cls(data_lines)

def __repr__(self):
return f"{self.__class__.__name__}(data_lines={self.data_lines!r})"


@attr.s(auto_attribs=True)
class ReadoutDataMessage(Iec6205621Data):
def __init__(self, data_block):
self.data_block = data_block
data_block: DataBlock

def to_representation(self):
def to_representation(self) -> str:
data = (
f"{constants.STX}{self.data_block.to_representation()}{constants.END_CHAR}"
f"{constants.LINE_END}{constants.ETX}"
Expand All @@ -186,27 +169,28 @@ def from_representation(cls, string_data: str):

return cls(data_block=data_block)

def __repr__(self):
return f"{self.__class__.__name__}(data_block={self.data_block!r})"


@attr.s(auto_attribs=True)
class CommandMessage(Iec6205621Data):
allowed_commands = ["P", "W", "R", "E", "B"]
allowed_command_types = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
allowed_commands: ClassVar[List[str]] = ["P", "W", "R", "E", "B"]
allowed_command_types: ClassVar[List[str]] = [
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
]

command: str
command_type: str
data_set: Optional[DataSet] = attr.ib(default=None)

def __init__(
self, command: str, command_type: str, data_set: typing.Optional[DataSet]
):
self.command = command
self.command_type = command_type
self.data_set = data_set

if command not in self.allowed_commands:
raise ValueError(f"{command} is not an allowed command")
if command_type not in self.allowed_command_types:
raise ValueError(f"{command_type} is not an allowed command type")

def to_representation(self):
def to_representation(self) -> str:
header = f"{constants.SOH}{self.command}{self.command_type}"
if self.data_set:
body = f"{constants.STX}{self.data_set.to_representation()}{constants.ETX}"
Expand Down Expand Up @@ -245,36 +229,28 @@ def for_single_write(cls, address, value):
data_set = DataSet(value=value, address=address)
return cls(command="W", command_type="1", data_set=data_set)

def __repr__(self):
return (
f"{self.__class__.__name__}("
f"command={self.command!r}, "
f"command_type={self.command_type!r}, "
f"data_set={self.data_set!r}"
f")"
)


@attr.s(auto_attribs=True)
class AnswerDataMessage(Iec6205621Data):
def __init__(self, data_block):
self.data_block = data_block
self._data = None

data_block: DataBlock
cached_data: Optional[List[DataSet]] = attr.ib(default=None, init=False)

@property
def data(self):
if not self._data:
self._get_all_data_sets()
if not self.cached_data:
self.get_all_data_sets()

return self._data
return self.cached_data

def _get_all_data_sets(self):
def get_all_data_sets(self):
data_sets = list()

for line in self.data_block.data_lines:
for set in line.data_sets:
data_sets.append(set)
for data_set in line.data_sets:
data_sets.append(data_set)

self._data = data_sets
self.cached_data = data_sets

def to_representation(self):
# TODO: this is not valid in case reading out partial blocks.
Expand All @@ -295,15 +271,12 @@ def from_representation(cls, string_data):

return cls(data_block=data_block)

def __repr__(self):
return f"{self.__class__.__name__}(data_block={self.data_block!r})"


@attr.s(auto_attribs=True)
class RequestMessage(Iec6205621Data):
def __init__(self, device_address=""):
self.device_address = device_address
device_address: str = attr.ib(default="")

def to_representation(self):
def to_representation(self) -> str:
return (
f"{constants.START_CHAR}{constants.REQUEST_CHAR}{self.device_address}"
f"{constants.END_CHAR}{constants.LINE_END}"
Expand All @@ -314,44 +287,32 @@ def from_representation(cls, string_data):
device_address = string_data[2:-3]
return cls(device_address)

def __repr__(self):
return f"{self.__class__.__name__}(device_address={self.device_address!r})"


@attr.s(auto_attribs=True)
class AckOptionSelectMessage(Iec6205621Data):
"""
Only support protocol mode 0: Normal
"""
""""""

def __init__(self, baud_char, mode_char):
self.baud_char = baud_char
self.mode_char = mode_char
baud_char: str
mode_char: str
protocol_char: str = attr.ib(default="0")

def to_representation(self):
return f"{constants.ACK}0{self.baud_char}{self.mode_char}{constants.LINE_END}"
return f"{constants.ACK}{self.protocol_char}{self.baud_char}{self.mode_char}{constants.LINE_END}"

@classmethod
def from_representation(cls, string_data):
protocol_char = string_data[1]
baud_char = string_data[2]
mode_char = string_data[3]
return cls(baud_char, mode_char)

def __repr__(self):
return (
f"{self.__class__.__name__}("
f"baud_char={self.baud_char!r}, "
f"mode_char={self.mode_char!r}"
f")"
)
return cls(baud_char, mode_char, protocol_char=protocol_char)


@attr.s(auto_attribs=True)
class IdentificationMessage(Iec6205621Data):
def __init__(
self, identification: str, manufacturer: str, switchover_baudrate_char: str
):
self.identification: str = identification
self.manufacturer: str = manufacturer
self.switchover_baudrate_char: str = switchover_baudrate_char

identification: str
manufacturer: str
switchover_baudrate_char: str

def to_representation(self):
return (
Expand All @@ -366,12 +327,3 @@ def from_representation(cls, string_data):
identification = string_data[6:-2]

return cls(identification, manufacturer, switchover_baudrate_char)

def __repr__(self):
return (
f"{self.__class__.__name__}("
f"identification={self.identification!r}, "
f"manufacturer={self.manufacturer!r}, "
f"switchover_baudrate_char={self.switchover_baudrate_char!r}"
f")"
)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
attrs==19.1.0
attrs==21.2.0
pyserial==3.5

#pytest
pytest==4.4.1
Expand Down

0 comments on commit 14fe7c4

Please sign in to comment.