-
Notifications
You must be signed in to change notification settings - Fork 6
/
__init__.py
173 lines (138 loc) · 5.61 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""Support for PH-803W."""
from datetime import timedelta
import logging
import threading
import time
import voluptuous as vol
from .lib import device
from .const import DOMAIN
from homeassistant.components import persistent_notification
from homeassistant.const import (
CONF_HOST,
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, discovery
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
UPDATE_TOPIC = f"{DOMAIN}_update"
ERROR_ITERVAL_MAPPING = [0, 10, 60, 300, 600, 3000, 6000]
ERROR_RECONNECT_INTERVAL = 120
NOTIFICATION_ID = "ph803w_device_notification"
NOTIFICATION_TITLE = "PH-803W Device status"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass: HomeAssistant, base_config: ConfigType) -> bool:
"""Set up waterfurnace platform."""
config = base_config[DOMAIN]
hass.data[DOMAIN] = DeviceData(hass, config)
hass.data[DOMAIN].start()
discovery.load_platform(hass, Platform.SENSOR, DOMAIN, {}, config)
discovery.load_platform(hass, Platform.BINARY_SENSOR, DOMAIN, {}, config)
return True
class DeviceData(threading.Thread):
"""PH-803W Data Collector.
This is implemented as a dedicated thread polling the device as the
device requires ping/pong every 4s. The alternative is to reconnect
for every new data, could work for the pH and ORP data but for the
switches a more direct feedback is wanted."""
def __init__(self, hass, config) -> None:
super().__init__()
self.name = "Ph803wThread"
self.hass = hass
self.host = config[CONF_HOST]
self.device_client = None
self._shutdown = False
self._fails = 0
def connected(self):
return self.device_client is not None
def passcode(self):
if self.device_client is not None:
return self.device_client.passcode
return None
def unique_name(self):
if self.device_client is not None:
return self.device_client.get_unique_name()
return None
def measurement(self):
if self.device_client is not None:
return self.device_client.get_latest_measurement()
return None
def run(self):
"""Thread run loop."""
@callback
def register():
"""Connect to hass for shutdown."""
def shutdown(event):
"""Shutdown the thread."""
_LOGGER.info("Signaled to shutdown")
self._shutdown = True
if self.device_client is not None:
self.device_client.abort()
self.join()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
self.hass.add_job(register)
# This does a tight loop in sending ping/pong to the
# device. That's a blocking call, which returns pretty
# quickly (0.5 second). It's important that we do this
# frequently though, because if we don't call the websocket at
# least every 4 seconds the device side closes the
# connection.
while True:
self.device_client = None
_LOGGER.info(f"Attempting to connect to device at {self.host}")
device_client = device.Device(self.host)
try:
if not device_client.run(once=True):
_LOGGER.info(
f"Device found but no measurement was received, reconnecting in {ERROR_RECONNECT_INTERVAL} seconds")
time.sleep(ERROR_RECONNECT_INTERVAL)
continue
except Exception as e:
_LOGGER.info(
f"Error connecting to device at {self.host}: {str(e)}")
_LOGGER.info(
f"Retrying connection in {ERROR_RECONNECT_INTERVAL} seconds")
time.sleep(ERROR_RECONNECT_INTERVAL)
continue
self.device_client = device_client
_LOGGER.debug("Registering callbacks")
self.device_client.register_callback(self.dispatcher_new_data)
self.device_client.register_callback(self.reset_fail_counter)
_LOGGER.info(f"Connected to {self.host}")
while True:
if self._shutdown:
_LOGGER.debug("Graceful shutdown")
return
try:
_LOGGER.info("Starting device client loop")
self.device_client.run(once=False)
except Exception as e:
_LOGGER.exception(f"Failed to read data: {str(e)}")
self.device_client.close()
self._fails += 1
error_mapping = self._fails
if error_mapping >= len(ERROR_ITERVAL_MAPPING):
error_mapping = len(ERROR_ITERVAL_MAPPING) - 1
sleep_time = ERROR_ITERVAL_MAPPING[error_mapping]
_LOGGER.info(
f"Sleeping {str(sleep_time)}s for failure #{str(self._fails)}")
self.device_client.reset_socket()
time.sleep(sleep_time)
@callback
def reset_fail_counter(self):
self._fails = 0
@callback
def dispatcher_new_data(self):
"""Noyifying HASS that new data is ready to read."""
dispatcher_send(self.hass, UPDATE_TOPIC)