Skip to content

Commit

Permalink
Expand data quality checks
Browse files Browse the repository at this point in the history
  • Loading branch information
cdpuk committed Jan 28, 2024
1 parent f5b03ff commit ba65e90
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ repos:
- --keep-updates
files: ^(custom_components|tests|script)/.+\.py$
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.6.1
rev: v1.8.0
hooks:
- id: mypy
args:
Expand Down
149 changes: 116 additions & 33 deletions custom_components/givenergy_local/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The GivEnergy update coordinator."""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from logging import getLogger

Expand All @@ -14,10 +15,51 @@

_LOGGER = getLogger(__name__)
_FULL_REFRESH_INTERVAL = timedelta(minutes=5)
_REFRESH_ATTEMPTS = 3
_COMMAND_TIMEOUT = 3.0
_COMMAND_RETRIES = 3


@dataclass
class QualityCheck:
"""Defines likely values for a given property."""

attr_name: str
min: float | None
max: float | None
min_inclusive: bool = True
max_inclusive: bool = True

@property
def range_description(self) -> str:
"""Provide a string representation of the accepted range.
This uses mathematical notation, where square brackets mean inclusive,
and round brackets mean exclusive.
"""
return "%s%s, %s%s" % ( # pylint: disable=consider-using-f-string
"[" if self.min_inclusive else "(",
self.min,
self.max,
"]" if self.max_inclusive else ")",
)


QC = QualityCheck
_INVERTER_QUALITY_CHECKS = [
QC("temp_inverter_heatsink", -10, 100),
QC("temp_charger", -10, 100),
QC("temp_battery", -10, 100),
QC("e_inverter_out_total", 0, 1e6, min_inclusive=False), # 1GWh
QC("e_grid_in_total", 0, 1e6, min_inclusive=False), # 1GWh
QC("e_grid_out_total", 0, 1e6, min_inclusive=False), # 1GWh
QC("battery_percent", 0, 100),
QC("p_eps_backup", -15e3, 15e3), # +/- 15kW
QC("p_grid_out", -1e6, 15e3), # 15kW export, 1MW import
QC("p_battery", -15e3, 15e3), # +/- 15kW
]


class GivEnergyUpdateCoordinator(DataUpdateCoordinator[Plant]):
"""Update coordinator that fetches data from a GivEnergy inverter."""

Expand All @@ -37,6 +79,7 @@ def __init__(self, hass: HomeAssistant, host: str) -> None:

async def async_shutdown(self) -> None:
"""Terminate the modbus connection and shut down the coordinator."""
_LOGGER.debug("Shutting down")
await self.client.close()
await super().async_shutdown()

Expand All @@ -49,42 +92,82 @@ async def _async_update_data(self) -> Plant:
if self.last_full_refresh < (datetime.utcnow() - _FULL_REFRESH_INTERVAL):
self.require_full_refresh = True

# Allow a few attempts to pull back valid data.
# Within the inverter comms, there are further retries to ensure >some< data is returned
# to the coordinator, but sometimes we still get bad values. When that data arrives back
# here, we perform some quality checks and trigger another attempt if something doesn't
# look right. If all that fails, then data will show as 'unavailable' in the UI.
attempt = 1
while attempt <= _REFRESH_ATTEMPTS:
try:
async with async_timeout.timeout(10):
_LOGGER.info(
"Fetching data from %s (attempt=%d/%d, full_refresh=%s)",
self.host,
attempt,
_REFRESH_ATTEMPTS,
self.require_full_refresh,
)
plant = await self.client.refresh_plant(
full_refresh=self.require_full_refresh, retries=2
)
except Exception as err:
await self.client.close()
raise UpdateFailed(f"Error communicating with inverter: {err}") from err

if not self._is_data_valid(plant):
attempt += 1
continue

if self.require_full_refresh:
self.require_full_refresh = False
self.last_full_refresh = datetime.utcnow()
return plant

raise UpdateFailed(
f"Failed to obtain valid data after {_REFRESH_ATTEMPTS} attempts"
)

@staticmethod
def _is_data_valid(plant: Plant) -> bool:
"""Perform checks to ensure returned data actually makes sense.
The connection sometimes returns what it claims is valid data, but many of the values
are zero (or other highly improbable values). This is particularly painful when values
are used in the energy dashboard, as the dashboard double counts everything up to the
point in the day when the figures go back to normal.
"""
try:
async with async_timeout.timeout(10):
_LOGGER.info(
"Fetching data from %s (full refresh=%s)",
self.host,
self.require_full_refresh,
inverter_data = plant.inverter
_ = plant.batteries
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Inverter model failed validation: %s", err)
return False

for check in _INVERTER_QUALITY_CHECKS:
value = inverter_data.dict().get(check.attr_name)
too_low = False
too_high = False

if min_val := check.min:
too_low = not (
value > min_val or (check.min_inclusive and value >= min_val)
)
plant = await self.client.refresh_plant(
full_refresh=self.require_full_refresh, retries=2
if max_val := check.max:
too_high = not (
value < max_val or (check.max_inclusive and value <= max_val)
)
except Exception as err:
_LOGGER.error("Disconnecting from inverter due to unexpected refresh error")
await self.client.close()
raise UpdateFailed(f"Error communicating with inverter: {err}") from err

# The connection sometimes returns what it claims is valid data, but many of the values
# are zero. This is particularly painful when values are used in the energy dashboard,
# as the dashboard double counts everything up to the point in the day when the figures
# go back to normal. Work around this by detecting some extremely unlikely zero values.
inverter_data = plant.inverter

# The heatsink and charger temperatures never seem to go below around 10 celsius, even
# when idle and temperatures well below zero for an outdoor installation.
heatsink_temp = inverter_data.temp_inverter_heatsink
charger_temp = inverter_data.temp_charger
if heatsink_temp == 0 or charger_temp == 0:
raise UpdateFailed("Data discarded: improbable zero temperature")

# Total inverter output would only ever be zero prior to commissioning.
if inverter_data.e_inverter_out_total <= 0:
raise UpdateFailed("Data discarded: inverter total output <= 0")

if self.require_full_refresh:
self.require_full_refresh = False
self.last_full_refresh = datetime.utcnow()
return plant

if too_low or too_high:
_LOGGER.warning(
"Data discarded: %s value of %s is out of range %s",
check.attr_name,
value,
check.range_description,
)
return False

return True

async def execute(self, requests: list[TransparentRequest]) -> None:
"""Execute a set of requests and force an update to read any new values."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,15 @@ async def connect(self) -> None:
)
# asyncio.create_task(self._task_dump_queues_to_files(), name='dump_queues_to_files'),
self.connected = True
_logger.info(f"Connection established to %s:%d", self.host, self.port)
_logger.info("Connection established to %s:%d", self.host, self.port)

async def close(self) -> None:
"""Disconnect from the remote host and clean up tasks and queues."""
if not self.connected:
return

_logger.debug("Disconnecting and cleaning up")

self.connected = False

if self.tx_queue:
Expand Down Expand Up @@ -189,7 +194,7 @@ async def _task_network_consumer(self):
# except RegisterCacheUpdateFailed as e:
# # await self.debug_frames['error'].put(frame)
# _logger.debug(f'Ignoring {message}: {e}')
_logger.error(
_logger.debug(
"network_consumer reader at EOF, cannot continue, closing connection"
)
await self.close()
Expand All @@ -204,7 +209,7 @@ async def _task_network_producer(self, tx_message_wait: float = 0.25):
if future:
future.set_result(True)
await asyncio.sleep(tx_message_wait)
_logger.error(
_logger.debug(
"network_producer writer is closing, cannot continue, closing connection"
)
await self.close()
Expand Down Expand Up @@ -271,6 +276,8 @@ async def send_request_and_await_response(
frame_sent, timeout=self.tx_queue.qsize() + 1
) # this should only happen if the producer task is stuck

_logger.debug("Request sent (attempt %d): %s", tries, request)

try:
await asyncio.wait_for(response_future, timeout=timeout)
if response_future.done():
Expand All @@ -286,9 +293,8 @@ async def send_request_and_await_response(

if tries <= retries:
_logger.debug(
"Timeout awaiting %s (future: %s), attempting retry %d of %d",
"Timeout awaiting %s, attempting retry %d of %d",
expected_response,
response_future,
tries,
retries,
)
Expand Down

0 comments on commit ba65e90

Please sign in to comment.