Skip to content

Commit

Permalink
Enhance helper functions for register value conversions (#397)
Browse files Browse the repository at this point in the history
Introduced `pack_value` and `unpack_value` functions for packing and unpacking register values using the struct std library.

Refactor sensors & test coverage to use the new helpers.
  • Loading branch information
maslyankov authored Jan 22, 2025
1 parent 5d430ef commit a664930
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 51 deletions.
77 changes: 54 additions & 23 deletions src/sunsynk/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import math
from typing import Any
import struct

_LOGGER = logging.getLogger(__name__)

Expand All @@ -12,6 +13,46 @@
NumType = float | int


def pack_value(value: int, bits: int = 16, signed: bool = True) -> RegType:
"""Pack a value into register format.
Args:
value: The value to pack
bits: Number of bits (16 or 32)
signed: Whether the value should be treated as signed
Returns:
For 16-bit: single register value
For 32-bit: tuple of (low, high) register values
"""
if bits == 16:
fmt = 'h' if signed else 'H'
return struct.unpack('H', struct.pack(fmt, value))
if bits == 32:
fmt = 'i' if signed else 'I'
return struct.unpack('2H', struct.pack(fmt, value))
raise ValueError(f"Unsupported number of bits: {bits}")


def unpack_value(regs: RegType, signed: bool = True) -> int:
"""Unpack register value(s) into an integer.
Args:
regs: Register values (1 or 2 registers)
signed: Whether to treat as signed value
Returns:
Unpacked integer value
"""
if len(regs) == 1:
fmt = 'h' if signed else 'H'
return struct.unpack(fmt, struct.pack('H', regs[0]))[0]
if len(regs) == 2:
fmt = 'i' if signed else 'I'
return struct.unpack(fmt, struct.pack('2H', regs[0], regs[1]))[0]
raise ValueError(f"Unsupported number of registers: {len(regs)}")


def ensure_tuple(val: Any) -> tuple[int, ...]:
"""Return a tuple."""
if isinstance(val, tuple):
Expand Down Expand Up @@ -48,21 +89,24 @@ def as_num(val: ValType) -> float | int:
return 0


def signed(val: int | float, bits: int = 16) -> int | float:
"""Convert value to signed int."""
sign_bit = 1 << (bits - 1)
if val < sign_bit:
return val
return val - (1 << bits)
# 16-bit only (old)
# return val if val <= 0x7FFF else val - 0x10000


def slug(name: str) -> str:
"""Create a slug."""
return name.lower().replace(" ", "_").replace("-", "_")


def hex_str(regs: RegType, address: RegType | None = None) -> str:
"""Convert register values to hex strings."""
res = (f"0x{r:04x}" for r in regs)
if address:
res = (f"{k}={v}" for k, v in zip(address, res, strict=True))
return f"{{{' '.join(res)}}}"


def patch_bitmask(value: int, patch: int, bitmask: int) -> int:
"""Combine bitmask values."""
return (patch & bitmask) + (value & (0xFFFF - bitmask))


class SSTime:
"""Deals with inverter time format conversion complexities."""

Expand Down Expand Up @@ -112,16 +156,3 @@ def str_value(self, value: str) -> None:
self.minutes = int(hours) * 60 + int(minutes)
except ValueError:
_LOGGER.warning("Invalid time string: %s (expected hh:mm)", value)


def patch_bitmask(value: int, patch: int, bitmask: int) -> int:
"""Combine bitmask values."""
return (patch & bitmask) + (value & (0xFFFF - bitmask))


def hex_str(regs: RegType, address: RegType | None = None) -> str:
"""Convert register values to hex strings."""
res = (f"0x{r:04x}" for r in regs)
if address:
res = (f"{k}={v}" for k, v in zip(address, res, strict=True))
return f"{{{' '.join(res)}}}"
30 changes: 21 additions & 9 deletions src/sunsynk/rwsensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
import attrs
from mqtt_entity.utils import BOOL_OFF, BOOL_ON

from sunsynk.helpers import NumType, RegType, SSTime, ValType, as_num, hex_str
from sunsynk.helpers import (
NumType,
RegType,
SSTime,
ValType,
as_num,
hex_str,
pack_value,
)
from sunsynk.sensors import Sensor

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,18 +76,14 @@ def dependencies(self) -> list[Sensor]:
return [s for s in (self.min, self.max) if isinstance(s, Sensor)]

def value_to_reg(self, value: ValType, resolve: ResolveType) -> RegType:
"""Get the reg value from a display value, or the current reg value if out of range."""
"""Get the reg value from a display value."""
if not self.address:
raise NotImplementedError("Cannot write to a sensor with no address")
fval = float(value) # type:ignore
minv = resolve_num(resolve, self.min, 0)
maxv = resolve_num(resolve, self.max, 100)
val = int(max(minv, min(maxv, fval)) / abs(self.factor))
if len(self.address) == 1:
if val < 0:
val = 0x10000 + val
return self.reg(val)
if len(self.address) == 2:
return self.reg(val & 0xFFFF, int(val >> 16))
raise NotImplementedError(f"Address length not supported: {self.address}")
return self.reg(*pack_value(val, bits=len(self.address)*16, signed=self.factor < 0))


@attrs.define(slots=True, eq=False)
Expand Down Expand Up @@ -178,6 +182,12 @@ def value_to_reg(self, value: ValType, resolve: ResolveType) -> RegType:
class SystemTimeRWSensor(RWSensor):
"""Read & write time sensor."""

def __attrs_post_init__(self) -> None:
"""Run post init."""
super().__attrs_post_init__()
if len(self.address) != 3:
raise ValueError("SystemTimeRWSensor requires exactly 3 registers")

def value_to_reg(self, value: ValType, resolve: ResolveType) -> RegType:
"""Get the reg value from a display value."""
# pylint: disable=invalid-name
Expand Down Expand Up @@ -243,6 +253,8 @@ def reg_to_value(self, regs: RegType) -> ValType:

def value_to_reg(self, value: ValType, resolve: ResolveType) -> RegType:
"""Get the reg value from a display value."""
if not self.address:
raise NotImplementedError("Cannot write to a sensor with no address")
return self.reg(SSTime(string=str(value)).reg_value)

@staticmethod
Expand Down
14 changes: 5 additions & 9 deletions src/sunsynk/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import attrs

from sunsynk.helpers import (
NumType,
RegType,
ValType,
NumType,
ensure_tuple,
int_round,
signed,
slug,
unpack_value,
)

_LOGGER = logging.getLogger(__name__)
Expand All @@ -38,12 +38,8 @@ def id(self) -> str:
def reg_to_value(self, regs: RegType) -> ValType:
"""Return the value from the registers."""
regs = self.masked(regs)
val: NumType = regs[0]
if len(regs) > 1:
val += regs[1] << 16
if self.factor < 0: # Indicates this register is signed
val = signed(val, bits=16 * len(regs))
val = int_round(val * abs(self.factor))
val: NumType = unpack_value(regs, signed=self.factor < 0)
val = int_round(float(val) * abs(self.factor))
_LOGGER.debug("%s=%s%s %s", self.id, val, self.unit, regs)
return val

Expand Down Expand Up @@ -141,7 +137,7 @@ class MathSensor(Sensor):

def reg_to_value(self, regs: RegType) -> ValType:
"""Calculate the math value."""
val = int_round(sum(signed(i) * s for i, s in zip(regs, self.factors)))
val = int_round(sum(unpack_value((i,), signed=True) * s for i, s in zip(regs, self.factors)))
if self.absolute and val < 0:
val = -val
if self.no_negative and val < 0:
Expand Down
3 changes: 2 additions & 1 deletion src/sunsynk/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import attr

from sunsynk.rwsensors import RWSensor
from sunsynk.sensors import BinarySensor, NumType, Sensor, ValType
from sunsynk.sensors import BinarySensor, Sensor, ValType
from sunsynk.helpers import NumType

_LOGGER = logging.getLogger(__name__)

Expand Down
51 changes: 42 additions & 9 deletions src/tests/sunsynk/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Test helpers."""

import struct

import pytest

from sunsynk.helpers import (
Expand All @@ -8,13 +10,15 @@
ensure_tuple,
hex_str,
int_round,
pack_value,
patch_bitmask,
signed,
unpack_value,
)
from sunsynk.sensors import Sensor


def test_as_num(caplog: pytest.LogCaptureFixture) -> None:
"""Test as_num function."""
assert as_num(None) == 0
assert as_num(1.0) == 1.0
assert as_num(1) == 1
Expand All @@ -26,30 +30,36 @@ def test_as_num(caplog: pytest.LogCaptureFixture) -> None:


def test_ensure_tuple() -> None:
"""Test ensure tuple."""
assert ensure_tuple(1) == (1,)
assert ensure_tuple((1,)) == (1,)
assert ensure_tuple((1, 5)) == (1, 5)
assert ensure_tuple("a") == ("a",)


def test_int_round() -> None:
"""Test int round."""
res1 = int_round(1.0)
assert isinstance(res1, int)
assert res1 == 1


def test_signed() -> None:
assert signed(0x7FFF) == 0x7FFF
assert signed(0xFFFF) == -1
assert signed(0) == 0
assert signed(32767) == 32767
assert signed(32768) == -32768
"""Test signed value conversion."""
assert unpack_value((0x7FFF,), signed=True) == 0x7FFF
assert unpack_value((0xFFFF,), signed=True) == -1
assert unpack_value((0,), signed=True) == 0
assert unpack_value((32767,), signed=True) == 32767
assert unpack_value((32768,), signed=True) == -32768


def test_signed32bits() -> None:
assert signed(0x7FFFFFFF, bits=32) == 0x7FFFFFFF
assert signed(0xFFFFFFFF, bits=32) == -1
assert signed(0x80000000, bits=32) == 0x80000000 - (1 << 32)
"""Test 32-bit signed value conversion."""
assert unpack_value((0xFFFF, 0x7FFF), signed=True) == 0x7FFFFFFF
assert unpack_value((0xFFFF, 0xFFFF), signed=True) == -1
assert unpack_value((0x0000, 0x8000), signed=True) == 0x80000000 - (
1 << 32
) # -2147483648


def test_signeds() -> None:
Expand All @@ -66,6 +76,7 @@ def test_signeds() -> None:


def test_time() -> None:
"""Test SSTime class."""
time = SSTime(minutes=10)
assert time.str_value == "0:10"
assert time.reg_value == 10
Expand Down Expand Up @@ -96,7 +107,29 @@ def test_time() -> None:
assert time.minutes == just_before_midnight


def test_pack_unpack() -> None:
"""Test pack_value and unpack_value functions."""
# Test 16-bit values
assert pack_value(-1, bits=16, signed=True) == (0xFFFF,)
assert pack_value(32767, bits=16, signed=True) == (0x7FFF,)
assert pack_value(65535, bits=16, signed=False) == (0xFFFF,)
# Test 32-bit values
assert pack_value(-1, bits=32, signed=True) == (0xFFFF, 0xFFFF)
assert pack_value(0x7FFFFFFF, bits=32, signed=True) == (0xFFFF, 0x7FFF)
# Test round-trip
val = -12345
assert unpack_value(pack_value(val, bits=16, signed=True), signed=True) == val
assert unpack_value(pack_value(val, bits=32, signed=True), signed=True) == val

# Test error cases
with pytest.raises(struct.error):
pack_value(-1, bits=16, signed=False)
with pytest.raises(ValueError):
pack_value(1, bits=8) # Invalid bit length


def test_patch_bitmask() -> None:
"""Test patch_bitmask function."""
assert patch_bitmask(2, 1, 1) == 3
assert patch_bitmask(1, 2, 2) == 3

Expand Down
1 change: 1 addition & 0 deletions src/tests/sunsynk/test_rwsensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_number_rw(state: InverterState) -> None:

# writing negative values (when allowed by min)
s.min = -10
s.factor = -1 # indicate signed values
assert s.value_to_reg(-1, state.get) == (65535,)

s = NumberRWSensor(1, "s2", factor=0.01)
Expand Down

0 comments on commit a664930

Please sign in to comment.