-
Notifications
You must be signed in to change notification settings - Fork 3
/
demo.py
84 lines (72 loc) · 2.88 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""Demo"""
import asyncio
import logging
from pyweatherflowudp.client import EVENT_DEVICE_DISCOVERED, WeatherFlowListener
from pyweatherflowudp.const import (
EVENT_RAIN_START,
EVENT_RAPID_WIND,
EVENT_STRIKE,
units,
)
from pyweatherflowudp.device import (
EVENT_LOAD_COMPLETE,
EVENT_OBSERVATION,
EVENT_STATUS_UPDATE,
AirSensorType,
SkySensorType,
TempestDevice,
WeatherFlowDevice,
)
from pyweatherflowudp.errors import ListenerError
from pyweatherflowudp.event import CustomEvent, Event
logging.basicConfig(level=logging.INFO)
async def main():
"""Main entry point."""
def device_discovered(device: WeatherFlowDevice):
"""Handle a discovered device."""
print("Found device:", device)
def device_event(event: Event):
"""Handle an event."""
print(event, "from", device)
if isinstance(event, CustomEvent) and event.name == EVENT_OBSERVATION:
if isinstance(device, AirSensorType):
# print the air temperature
print("Air temperature:", device.air_temperature)
# print the air temperature in °F
print("Air temperature:", device.air_temperature.to("degF"))
# also prints the air temperature in °F
print("Air temperature:", device.air_temperature.to(units.degF))
# calculate the sea level pressure from a height of 1000 meters
print(
"Sea level pressure:",
device.calculate_sea_level_pressure(units.Quantity(1000, "m")),
)
# or at 1000 feet
print(
"Sea level pressure:",
device.calculate_sea_level_pressure(
units.Quantity(1000, units.foot)
),
)
if isinstance(device, TempestDevice):
# print the feels like temperature in °F
print(
"Feels like temperature:",
device.feels_like_temperature.to("degF"),
)
device.on(EVENT_LOAD_COMPLETE, device_event)
device.on(EVENT_OBSERVATION, device_event)
device.on(EVENT_STATUS_UPDATE, device_event)
if isinstance(device, AirSensorType):
device.on(EVENT_STRIKE, device_event)
if isinstance(device, SkySensorType):
device.on(EVENT_RAIN_START, device_event)
device.on(EVENT_RAPID_WIND, device_event)
try:
async with WeatherFlowListener() as client:
client.on(EVENT_DEVICE_DISCOVERED, lambda device: device_discovered(device))
await asyncio.sleep(60)
except ListenerError as ex:
print(ex)
if __name__ == "__main__":
asyncio.run(main())