Skip to content

Commit

Permalink
Rework plc data read (#40)
Browse files Browse the repository at this point in the history
* rewrite parts of reading from plc memory

try to fix issue #38

* typo

* add tests for new plc read
  • Loading branch information
drunsinn authored Oct 24, 2022
1 parent 621ad39 commit f63aef2
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 67 deletions.
187 changes: 121 additions & 66 deletions pyLSV2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
file manipulation is blocked by a lockout parameter. Use at your own risk!
"""
import logging
import math
import re
import os
import struct
Expand Down Expand Up @@ -1054,111 +1055,165 @@ def read_plc_memory(self, address, mem_type, count=1):

self.login(login=Login.PLCDEBUG)

first_element = address
number_of_elements = count # keep old parameter name for compatibility

if mem_type is MemoryType.MARKER:
start_address = self._sys_par["Marker_Start"]
max_count = self._sys_par["Markers"]
mem_byte_count = 1
mem_start_address = self._sys_par["Marker_Start"]
max_elemens = self._sys_par["Markers"]
mem_bytes_per_element = 1
unpack_string = "!?"
elif mem_type is MemoryType.INPUT:
start_address = self._sys_par["Input_Start"]
max_count = self._sys_par["Inputs"]
mem_byte_count = 1
mem_start_address = self._sys_par["Input_Start"]
max_elemens = self._sys_par["Inputs"]
mem_bytes_per_element = 1
unpack_string = "!?"
elif mem_type is MemoryType.OUTPUT:
start_address = self._sys_par["Output_Start"]
max_count = self._sys_par["Outputs"]
mem_byte_count = 1
mem_start_address = self._sys_par["Output_Start"]
max_elemens = self._sys_par["Outputs"]
mem_bytes_per_element = 1
unpack_string = "!?"
elif mem_type is MemoryType.COUNTER:
start_address = self._sys_par["Counter_Start"]
max_count = self._sys_par["Counters"]
mem_byte_count = 1
mem_start_address = self._sys_par["Counter_Start"]
max_elemens = self._sys_par["Counters"]
mem_bytes_per_element = 1
unpack_string = "!?"
elif mem_type is MemoryType.TIMER:
start_address = self._sys_par["Timer_Start"]
max_count = self._sys_par["Timers"]
mem_byte_count = 1
mem_start_address = self._sys_par["Timer_Start"]
max_elemens = self._sys_par["Timers"]
mem_bytes_per_element = 1
unpack_string = "!?"
elif mem_type is MemoryType.BYTE:
start_address = self._sys_par["Word_Start"]
max_count = self._sys_par["Words"] * 2
mem_byte_count = 1
mem_start_address = self._sys_par["Word_Start"]
max_elemens = self._sys_par["Words"] * 2
mem_bytes_per_element = 1
unpack_string = "!B"
elif mem_type is MemoryType.WORD:
start_address = self._sys_par["Word_Start"]
max_count = self._sys_par["Words"]
mem_byte_count = 2
mem_start_address = self._sys_par["Word_Start"]
max_elemens = self._sys_par["Words"]
mem_bytes_per_element = 2
unpack_string = "<H"
elif mem_type is MemoryType.DWORD:
start_address = self._sys_par["Word_Start"]
max_count = self._sys_par["Words"] / 4
mem_byte_count = 4
mem_start_address = self._sys_par["Word_Start"]
max_elemens = self._sys_par["Words"] / 4
mem_bytes_per_element = 4
unpack_string = "<L"
elif mem_type is MemoryType.STRING:
start_address = self._sys_par["String_Start"]
max_count = self._sys_par["Strings"]
mem_byte_count = self._sys_par["String_Length"]
unpack_string = "{}s".format(mem_byte_count)
mem_start_address = self._sys_par["String_Start"]
max_elemens = self._sys_par["Strings"]
mem_bytes_per_element = self._sys_par["String_Length"]
# unpack_string = "{}s".format(mem_byte_count)
elif mem_type is MemoryType.INPUT_WORD:
start_address = self._sys_par["Input_Word_Start"]
max_count = self._sys_par["Input"]
mem_byte_count = 2
mem_start_address = self._sys_par["Input_Word_Start"]
max_elemens = self._sys_par["Input"]
mem_bytes_per_element = 2
unpack_string = "<H"
elif mem_type is MemoryType.OUTPUT_WORD:
start_address = self._sys_par["Output_Word_Start"]
max_count = self._sys_par["Output_Words"]
mem_byte_count = 2
mem_start_address = self._sys_par["Output_Word_Start"]
max_elemens = self._sys_par["Output_Words"]
mem_bytes_per_element = 2
unpack_string = "<H"
else:
raise Exception("unknown address type")

if count > max_count:
raise Exception("maximum number of values is %d" % max_count)
raise ValueError("unknown address type")

if count > 0xFF:
raise Exception("can't read more than 255 elements at a time")
if number_of_elements > max_elemens:
raise ValueError("maximum number of values is %d" % max_elemens)

plc_values = list()

if mem_type is MemoryType.STRING:
# advance address if necessary
address = address + (count - 1) * mem_byte_count
for i in range(count):
payload = bytearray()
payload.extend(
struct.pack("!L", start_address + address + i * mem_byte_count)

for i in range(number_of_elements):
address = (
mem_start_address
+ first_element * mem_bytes_per_element
+ i * mem_bytes_per_element
)
payload.extend(struct.pack("!B", mem_byte_count))

payload = bytearray()
payload.extend(struct.pack("!L", address))
payload.extend(struct.pack("!B", mem_bytes_per_element))
result = self._send_recive(CMD.R_MB, RSP.S_MB, payload=payload)
if result:
logging.debug("read string %d", address + i * mem_byte_count)
if isinstance(result, (bytearray,)):
logging.debug(
"read string %d with lenght %d",
(first_element + i),
len(result),
)

unpack_string = "{}s".format(len(result))

plc_values.append(
struct.unpack(unpack_string, result)[0]
.rstrip(b"\x00")
.decode("utf8")
.decode("latin1")
)
else:
logging.error(
"failed to read string from address %d",
start_address + address + i * mem_byte_count,
"failed to read string %d from address %d",
(first_element + i),
address,
)
return False
else:
payload = bytearray()
payload.extend(struct.pack("!L", start_address + address))
payload.extend(struct.pack("!B", count * mem_byte_count))
result = self._send_recive(CMD.R_MB, RSP.S_MB, payload=payload)
if result:
logging.debug("read %d value(s) from address %d", count, address)
for i in range(0, len(result), mem_byte_count):
plc_values.append(
struct.unpack(unpack_string, result[i : i + mem_byte_count])[0]
)
else:
logging.error(
"failed to read string from address %d", start_address + address

max_elements_per_transfer = math.floor(255 / mem_bytes_per_element)
num_groups = math.ceil(number_of_elements / max_elements_per_transfer)
logging.debug(
"memory type allows %d elements per telegram, split request into %d group(s)",
max_elements_per_transfer,
num_groups,
)
remaining_elements = number_of_elements

for i in range(num_groups):
if remaining_elements >= max_elements_per_transfer:
elements_in_group = max_elements_per_transfer
remaining_elements -= max_elements_per_transfer
else:
elements_in_group = remaining_elements
address = (
mem_start_address
+ first_element * mem_bytes_per_element
+ i * elements_in_group * mem_bytes_per_element
)
return False
logging.debug(
"current transfer group %d has %d elements", i, elements_in_group
)

# address = mem_start_address + first_element * mem_bytes_per_element

payload = bytearray()
payload.extend(struct.pack("!L", address))
payload.extend(
struct.pack("!B", elements_in_group * mem_bytes_per_element)
)
result = self._send_recive(CMD.R_MB, RSP.S_MB, payload=payload)
if isinstance(result, (bytearray,)):
logging.debug(
"read %d value(s) from address %d",
elements_in_group,
first_element,
)
for i in range(0, len(result), mem_bytes_per_element):
plc_values.append(
struct.unpack(
unpack_string, result[i : i + mem_bytes_per_element]
)[0]
)
else:
logging.error(
"failed to read value from address %d",
mem_start_address + first_element,
)
return False
logging.debug("read a total of %d value(s)", len(plc_values))
if len(plc_values) != number_of_elements:
raise Exception(
"number of recived values %d is not equal to number of requested %d",
len(plc_values),
number_of_elements,
)
return plc_values

def set_keyboard_access(self, unlocked):
Expand Down
67 changes: 66 additions & 1 deletion tests/test_plc_read.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""test to see if plc functions work"""

import pytest
import pyLSV2


Expand Down Expand Up @@ -38,6 +38,71 @@ def test_plc_read(address, timeout):
lsv2.disconnect()


def test_plc_read_marker(address, timeout):
"""test reading of plc markers"""
lsv2 = pyLSV2.LSV2(address, port=19000, timeout=timeout, safe_mode=False)
lsv2.connect()

marker_data_0 = lsv2.read_plc_memory(
address=0, mem_type=pyLSV2.MemoryType.MARKER, count=1
)
assert isinstance(marker_data_0, (list,)) is True
assert (len(marker_data_0) == 1) is True

marker_data_1 = lsv2.read_plc_memory(
address=1, mem_type=pyLSV2.MemoryType.MARKER, count=1
)
assert isinstance(marker_data_1, (list,)) is True
assert (len(marker_data_1) == 1) is True

marker_data = lsv2.read_plc_memory(
address=0, mem_type=pyLSV2.MemoryType.MARKER, count=2
)
assert (len(marker_data) == 2) is True
assert (marker_data[0] == marker_data_0[0]) is True
assert (marker_data[1] == marker_data_1[0]) is True

lsv2.disconnect()


def test_plc_read_string(address, timeout):
"""test reading of plc strings"""
lsv2 = pyLSV2.LSV2(address, port=19000, timeout=timeout, safe_mode=False)
lsv2.connect()

data_0 = lsv2.read_plc_memory(address=0, mem_type=pyLSV2.MemoryType.STRING, count=1)
assert isinstance(data_0, (list,)) is True
assert (len(data_0) == 1) is True

data_1 = lsv2.read_plc_memory(address=1, mem_type=pyLSV2.MemoryType.STRING, count=1)
assert isinstance(data_1, (list,)) is True
assert (len(data_1) == 1) is True

data = lsv2.read_plc_memory(address=0, mem_type=pyLSV2.MemoryType.STRING, count=2)
assert (len(data) == 2) is True
assert (data[0] == data_0[0]) is True
assert (data[1] == data_1[0]) is True

lsv2.disconnect()


def test_plc_read_errors(address, timeout):
"""test error states for reading plc data"""
lsv2 = pyLSV2.LSV2(address, port=19000, timeout=timeout, safe_mode=False)
lsv2.connect()

num_words = lsv2.get_system_parameter()["Words"]

with pytest.raises(ValueError) as exc_info:
data = lsv2.read_plc_memory(
address=0, mem_type=pyLSV2.MemoryType.WORD, count=(num_words + 1)
)
exception_raised = exc_info.value
assert isinstance(exception_raised, (ValueError,)) is True

lsv2.disconnect()


def test_data_path_read(address, timeout):
"""test to see if reading via data path works. run only on iTNC"""
lsv2 = pyLSV2.LSV2(address, port=19000, timeout=timeout, safe_mode=False)
Expand Down

0 comments on commit f63aef2

Please sign in to comment.