diff --git a/custom_components/weatherlink/api.py b/custom_components/weatherlink/api.py index 25fd0f2..5484179 100644 --- a/custom_components/weatherlink/api.py +++ b/custom_components/weatherlink/api.py @@ -538,18 +538,15 @@ async def current_conditions(self) -> CurrentConditions: def fahrenheit_to_celsius(value: float) -> float: - c = (value - 32) * 5 / 9 - return round(c, 3) + return (value - 32) * 5 / 9 def mph_to_kph(value: float) -> float: - kph = 1.60934 * value - return round(kph, 3) + return 1.609344 * value def in_hg_to_hpa(value: float) -> float: - hpa = 33.86389 * value - return round(hpa, 3) + return 33.86389 * value def json_apply_converters(d: JsonObject, **converters: Callable[[Any], Any]) -> None: diff --git a/custom_components/weatherlink/config_flow.py b/custom_components/weatherlink/config_flow.py index 03e571b..11eae86 100644 --- a/custom_components/weatherlink/config_flow.py +++ b/custom_components/weatherlink/config_flow.py @@ -89,6 +89,7 @@ async def async_step_zeroconf_confirm(self, user_input=None): class OptionsFlow(config_entries.OptionsFlow): config_entry: config_entries.ConfigEntry options: dict + units_config: dict def __init__(self, config_entry: config_entries.ConfigEntry) -> None: super().__init__() @@ -127,13 +128,23 @@ async def async_step_misc(self, user_input=None): async def async_step_units(self, user_input=None): if user_input is not None: - config = UnitConfig.from_unit_of_measurement(user_input) + self.units_config = user_input + return await self.async_step_rounding() + + return self.async_show_form( + step_id="units", + data_schema=get_unit_config(self.hass, self.config_entry).units_schema(), + ) + + async def async_step_rounding(self, user_input=None): + if user_input is not None: + config = UnitConfig.from_config_flow(self.units_config, user_input) self.options["units"] = config.as_dict() return await self.finish() return self.async_show_form( - step_id="units", - data_schema=get_unit_config(self.hass, self.config_entry).data_schema(), + step_id="rounding", + data_schema=get_unit_config(self.hass, self.config_entry).rounding_schema(), ) async def finish(self): diff --git a/custom_components/weatherlink/sensor_common.py b/custom_components/weatherlink/sensor_common.py index 64cf9f8..32d77b2 100644 --- a/custom_components/weatherlink/sensor_common.py +++ b/custom_components/weatherlink/sensor_common.py @@ -4,7 +4,7 @@ from . import WeatherLinkCoordinator, WeatherLinkEntity from .api import ConditionRecord, CurrentConditions -from .units import Units +from .units import Measurement logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ def __init_subclass__( cls, *, sensor_name: str, - unit_of_measurement: Union[str, Type[Units], None], + unit_of_measurement: Union[str, Type[Measurement], None], device_class: Optional[str], required_conditions: Iterable[Type[ConditionRecord]] = None, **kwargs, @@ -89,7 +89,7 @@ def unit_of_measurement(self): if unit is None or isinstance(unit, str): return unit - return self.units.by_units(unit).unit_of_measurement + return self.units.by_measurement(unit).info.unit_of_measurement @property def device_class(self): diff --git a/custom_components/weatherlink/translations/en.json b/custom_components/weatherlink/translations/en.json index e18745c..70b25fc 100644 --- a/custom_components/weatherlink/translations/en.json +++ b/custom_components/weatherlink/translations/en.json @@ -23,12 +23,26 @@ "step": { "misc": { "title": "Misc", + "description": "(1 / 3)", "data": { "update_interval": "Update interval" } }, "units": { "title": "Units", + "description": "(2 / 3)", + "data": { + "temperature": "Temperature", + "pressure": "Pressure", + "wind_speed": "Wind Speed", + "pm": "Pm", + "rain_rate": "Rain rate", + "rainfall": "Rain volume" + } + }, + "rounding": { + "title": "Rounding", + "description": "(3 / 3)", "data": { "temperature": "Temperature", "pressure": "Pressure", diff --git a/custom_components/weatherlink/units.py b/custom_components/weatherlink/units.py index 78630a2..57c9cce 100644 --- a/custom_components/weatherlink/units.py +++ b/custom_components/weatherlink/units.py @@ -1,124 +1,97 @@ import dataclasses import logging -from typing import Callable, Dict, Optional, Type, Union +from typing import Mapping, Optional, Type, Union import voluptuous as vol from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_UNIT_SYSTEM_IMPERIAL from homeassistant.core import HomeAssistant +from .units_db import ( + Measurement, + Pm, + Pressure, + Rainfall, + RainRate, + Temperature, + UnitInfo, + WindSpeed, +) + logger = logging.getLogger(__name__) -FactorT = Union[Callable[[float], float], float] +IntOrFloat = Union[float, int] -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass() class Unit: - unit_of_measurement: str - ndigits: int - factor: Optional[FactorT] = None - - def __round(self, v: float) -> float: - ndigits = self.ndigits or None - return round(v, ndigits) + info: UnitInfo + ndigits: Optional[int] - def convert(self, v: float) -> float: - if factor := self.factor: - if callable(factor): - v = factor(v) - else: - v *= factor + def __round(self, v: float) -> IntOrFloat: + if self.ndigits is None: + return v + # 0 -> None so it's rounded to an int + return round(v, self.ndigits or None) - return self.__round(v) + def convert(self, v: float) -> IntOrFloat: + return self.__round(self.info.convert(v)) - def convert_optional(self, v: Optional[float]) -> Optional[float]: + def convert_optional(self, v: Optional[float]) -> Optional[IntOrFloat]: if v is None: return v return self.convert(v) - -class Units: - _DATA_SCHEMA: vol.Schema - _UNIT2KEY: Dict[Unit, str] - _DEFAULT_UNIT: Unit - - def __init_subclass__(cls) -> None: - super().__init_subclass__() - - cls._UNIT2KEY = {} - for key in dir(cls): - unit = getattr(cls, key) - if not isinstance(unit, Unit): - continue - - cls._UNIT2KEY[unit] = key - - cls._DATA_SCHEMA = vol.In([unit.unit_of_measurement for unit in cls._UNIT2KEY]) - cls._DEFAULT_UNIT = next(iter(cls._UNIT2KEY)) - @classmethod - def data_schema(cls) -> vol.Schema: - return cls._DATA_SCHEMA + def from_unit_info(cls, info: UnitInfo): + return cls(info=info, ndigits=info.default_ndigits) @classmethod - def default(cls) -> Unit: - return cls._DEFAULT_UNIT + def default(cls, measurement: Type[Measurement]): + return cls.from_unit_info(measurement.default()) @classmethod - def from_unit_of_measurement(cls, unit_of_measurement: str) -> Unit: - try: - return next( - unit - for unit in cls._UNIT2KEY - if unit.unit_of_measurement == unit_of_measurement - ) - except StopIteration: - raise LookupError( - f"{unit_of_measurement!r} not in {cls.__qualname__}" - ) from None + def from_dict(cls, measurement: Type[Measurement], data): + info: UnitInfo + ndigits: Optional[int] + + if isinstance(data, str): + # compatibility for version 0.3.0 + info = getattr(measurement, data) + ndigits = info.default_ndigits + else: + info = getattr(measurement, data["key"]) + ndigits = data["ndigits"] + if ndigits is not None: + ndigits = int(ndigits) + + return cls(info=info, ndigits=ndigits) - @classmethod - def to_key(cls, unit: Unit) -> str: - try: - return cls._UNIT2KEY[unit] - except KeyError: - raise LookupError(f"{unit!r} not in {cls.__qualname__}") from None - - -class Temperature(Units): - CELSIUS = Unit(unit_of_measurement="°C", ndigits=1) - FAHRENHEIT = Unit( - unit_of_measurement="°F", factor=lambda c: (c * 9.0 / 5.0) + 32.0, ndigits=0 - ) - - -class Pressure(Units): - HPA = Unit(unit_of_measurement="hPa", ndigits=0) - PSI = Unit(unit_of_measurement="psi", factor=0.0145, ndigits=2) - IN_HG = Unit(unit_of_measurement="inHg", factor=0.0295, ndigits=2) - MM_HG = Unit(unit_of_measurement="mmHg", factor=0.75, ndigits=0) - - -class WindSpeed(Units): - KMH = Unit(unit_of_measurement="km/h", ndigits=1) - MPS = Unit(unit_of_measurement="m/s", factor=1.0 / 3.6, ndigits=1) - MPH = Unit(unit_of_measurement="mph", factor=0.621, ndigits=1) - KNOTS = Unit(unit_of_measurement="kn", factor=1.0 / 1.852, ndigits=1) - FTPS = Unit(unit_of_measurement="ft/s", factor=0.911, ndigits=1) + def as_dict(self): + return {"key": self.info.key, "ndigits": self.ndigits} -class Pm(Units): - UG_PER_M3 = Unit(unit_of_measurement="µg/m³", ndigits=2) +def ndigits2rounding(ndigits: Optional[int]) -> str: + if ndigits is None: + return "raw" + if ndigits <= 0: + return "1" + return f"0.{1:0{ndigits}}" -class RainRate(Units): - MMH = Unit(unit_of_measurement="mm/h", ndigits=1) - INH = Unit(unit_of_measurement="in/h", factor=1.0 / 25.4, ndigits=2) +def rounding_schema(max_ndigits: int) -> vol.In: + return vol.In( + [ + ndigits2rounding(None), + *(ndigits2rounding(n) for n in range(0, max_ndigits + 1)), + ] + ) -class Rainfall(Units): - MM = Unit(unit_of_measurement="mm", ndigits=1) - IN = Unit(unit_of_measurement="in", factor=1.0 / 25.4, ndigits=2) +def rounding2ndigits(rounding: str) -> Optional[int]: + if rounding == "raw": + return None + return rounding.count("0") @dataclasses.dataclass() @@ -130,16 +103,51 @@ class UnitConfig: rain_rate: Unit rainfall: Unit + def by_measurement(self, measurement: Type[Measurement]) -> Unit: + return getattr(self, _UNIT_CONFIG_MEASUREMENT2KEY[measurement]) + + def units_schema(self) -> vol.Schema: + schema = {} + for key, measurement in _UNIT_CONFIG_KEY2MEASUREMENT.items(): + unit = getattr(self, key) + key_schema = vol.Required( + key, + default=unit.info.unit_of_measurement, + ) + schema[key_schema] = measurement.data_schema() + + return vol.Schema(schema) + + def rounding_schema(self) -> vol.Schema: + schema = {} + for key in _UNIT_CONFIG_KEY2MEASUREMENT: + unit = getattr(self, key) + key_schema = vol.Required( + key, + default=ndigits2rounding(unit.ndigits), + ) + schema[key_schema] = rounding_schema(4) + + return vol.Schema(schema) + @classmethod - def _get_units_cls(cls, key: str) -> Type[Units]: - return _UNIT_CONFIG_KEY2UNITS[key] + def from_config_flow(cls, units: dict, rounding: dict): + kwargs = {} + for key, measurement in _UNIT_CONFIG_KEY2MEASUREMENT.items(): + info = measurement.from_unit_of_measurement(units[key]) + ndigits = rounding2ndigits(rounding[key]) + kwargs[key] = Unit(info=info, ndigits=ndigits) + return cls(**kwargs) - def by_units(self, units: Type[Units]) -> Unit: - return getattr(self, _UNIT_CONFIG_UNITS2KEY[units]) + @classmethod + def _from_unit_infos(cls, **kwargs): + for key, info in kwargs.items(): + kwargs[key] = Unit.from_unit_info(info) + return cls(**kwargs) @classmethod def default_metric(cls): - return cls( + return cls._from_unit_infos( temperature=Temperature.CELSIUS, pressure=Pressure.HPA, wind_speed=WindSpeed.KMH, @@ -150,7 +158,7 @@ def default_metric(cls): @classmethod def default_imperial(cls): - return cls( + return cls._from_unit_infos( temperature=Temperature.FAHRENHEIT, pressure=Pressure.PSI, wind_speed=WindSpeed.MPH, @@ -159,59 +167,29 @@ def default_imperial(cls): rainfall=Rainfall.IN, ) - def data_schema(self) -> vol.Schema: - return vol.Schema( - { - vol.Required( - field.name, default=getattr(self, field.name).unit_of_measurement - ): self._get_units_cls(field.name).data_schema() - for field in _UNIT_CONFIG_FIELDS - } - ) - @classmethod def from_dict(cls, data): kwargs = {} - for field in _UNIT_CONFIG_FIELDS: - key = field.name - units_cls: Type[Units] = cls._get_units_cls(key) + for key, measurement in _UNIT_CONFIG_KEY2MEASUREMENT.items(): try: - value = getattr(units_cls, data[key]) + value = Unit.from_dict(measurement, data[key]) except KeyError: - value = units_cls.default() + value = Unit.default(measurement) except Exception: logger.exception(f"failed to load {key!r} unit: {data!r}") - value = units_cls.default() + value = Unit.default(measurement) kwargs[key] = value return cls(**kwargs) - def as_dict(self): + def as_dict(self) -> dict: data = {} - for field in _UNIT_CONFIG_FIELDS: - key = field.name - units_cls: Type[Units] = self._get_units_cls(key) - data[key] = units_cls.to_key(getattr(self, key)) + for key in _UNIT_CONFIG_KEY2MEASUREMENT: + data[key] = getattr(self, key).as_dict() return data - @classmethod - def from_unit_of_measurement(cls, data): - kwargs = {} - for field in _UNIT_CONFIG_FIELDS: - key = field.name - units_cls: Type[Units] = cls._get_units_cls(key) - kwargs[key] = units_cls.from_unit_of_measurement(data[key]) - return cls(**kwargs) - - def to_unit_of_measurement(self): - return { - field.name: getattr(self, field.name).unit_of_measurement - for field in _UNIT_CONFIG_FIELDS - } - -_UNIT_CONFIG_FIELDS = dataclasses.fields(UnitConfig) -_UNIT_CONFIG_KEY2UNITS = { +_UNIT_CONFIG_KEY2MEASUREMENT: Mapping[str, Type[Measurement]] = { "temperature": Temperature, "pressure": Pressure, "wind_speed": WindSpeed, @@ -219,7 +197,9 @@ def to_unit_of_measurement(self): "rain_rate": RainRate, "rainfall": Rainfall, } -_UNIT_CONFIG_UNITS2KEY = {value: key for key, value in _UNIT_CONFIG_KEY2UNITS.items()} +_UNIT_CONFIG_MEASUREMENT2KEY = { + value: key for key, value in _UNIT_CONFIG_KEY2MEASUREMENT.items() +} def get_unit_config(hass: HomeAssistant, entry: ConfigEntry) -> UnitConfig: diff --git a/custom_components/weatherlink/units_db.py b/custom_components/weatherlink/units_db.py new file mode 100644 index 0000000..e253556 --- /dev/null +++ b/custom_components/weatherlink/units_db.py @@ -0,0 +1,107 @@ +import dataclasses +from typing import Callable, Optional, Tuple, Type, Union + +import voluptuous as vol + +FactorT = Union[Callable[[float], float], float] + + +@dataclasses.dataclass(unsafe_hash=True) +class UnitInfo: + key: str = dataclasses.field(init=False) + measurement: Type["Measurement"] = dataclasses.field(init=False) + + unit_of_measurement: str + default_ndigits: Optional[int] + factor: Optional[FactorT] = None + + def convert(self, v: float) -> float: + if factor := self.factor: + if callable(factor): + v = factor(v) + else: + v *= factor + return v + + +class Measurement: + _UNITS: Tuple[UnitInfo, ...] + _DATA_SCHEMA: vol.In + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + + units = [] + for key in dir(cls): + unit = getattr(cls, key) + if not isinstance(unit, UnitInfo): + continue + + unit.key = key + unit.measurement = cls + + units.append(unit) + + cls._UNITS = tuple(units) + cls._DATA_SCHEMA = vol.In( + tuple(unit.unit_of_measurement for unit in cls._UNITS) + ) + + @classmethod + def default(cls) -> Optional[UnitInfo]: + return next(iter(cls._UNITS), None) + + @classmethod + def data_schema(cls) -> vol.In: + return cls._DATA_SCHEMA + + @classmethod + def from_unit_of_measurement(cls, unit_of_measurement: str): + try: + return next( + unit + for unit in cls._UNITS + if unit.unit_of_measurement == unit_of_measurement + ) + except StopIteration: + raise LookupError( + f"{unit_of_measurement!r} not in {cls.__qualname__}" + ) from None + + +class Temperature(Measurement): + CELSIUS = UnitInfo(unit_of_measurement="°C", default_ndigits=1) + FAHRENHEIT = UnitInfo( + unit_of_measurement="°F", + factor=lambda c: (c * 9.0 / 5.0) + 32.0, + default_ndigits=0, + ) + + +class Pressure(Measurement): + HPA = UnitInfo(unit_of_measurement="hPa", default_ndigits=0) + PSI = UnitInfo(unit_of_measurement="psi", factor=0.0145, default_ndigits=2) + IN_HG = UnitInfo(unit_of_measurement="inHg", factor=0.0295, default_ndigits=2) + MM_HG = UnitInfo(unit_of_measurement="mmHg", factor=0.75, default_ndigits=0) + + +class WindSpeed(Measurement): + KMH = UnitInfo(unit_of_measurement="km/h", default_ndigits=1) + MPS = UnitInfo(unit_of_measurement="m/s", factor=1.0 / 3.6, default_ndigits=1) + MPH = UnitInfo(unit_of_measurement="mph", factor=0.621, default_ndigits=1) + KNOTS = UnitInfo(unit_of_measurement="kn", factor=1.0 / 1.852, default_ndigits=1) + FTPS = UnitInfo(unit_of_measurement="ft/s", factor=0.911, default_ndigits=1) + + +class Pm(Measurement): + UG_PER_M3 = UnitInfo(unit_of_measurement="µg/m³", default_ndigits=2) + + +class RainRate(Measurement): + MMH = UnitInfo(unit_of_measurement="mm/h", default_ndigits=1) + INH = UnitInfo(unit_of_measurement="in/h", factor=1.0 / 25.4, default_ndigits=2) + + +class Rainfall(Measurement): + MM = UnitInfo(unit_of_measurement="mm", default_ndigits=1) + IN = UnitInfo(unit_of_measurement="in", factor=1.0 / 25.4, default_ndigits=2)