forked from dennyreiter/mqtt-bed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt-bed.py
124 lines (98 loc) · 3.5 KB
/
mqtt-bed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/home/pi/.pyenv/shims/python
import os
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from asyncio_mqtt import Client, MqttError
from controllers.dewertokin import dewertokinBLEController
from controllers.jiecang import jiecangBLEController
from controllers.serta import sertaBLEController
# mqtt-bed default config values. Set these in config.py yourself.
MQTT_USERNAME = "mqttbed"
MQTT_PASSWORD = "mqtt-bed"
MQTT_SERVER = "127.0.0.1"
MQTT_SERVER_PORT = 1883
MQTT_TOPIC = "bed"
MQTT_CHECKIN_TOPIC = "checkIn/bed"
MQTT_CHECKIN_PAYLOAD = "OK"
MQTT_ONLINE_PAYLOAD = "online"
MQTT_QOS = 0
DEBUG = 0
BED_TYPE = "serta"
from config import *
async def bed_loop(ble):
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client(
MQTT_SERVER,
port=MQTT_SERVER_PORT,
username=MQTT_USERNAME,
password=MQTT_PASSWORD,
)
await stack.enter_async_context(client)
# Set up the topic filter
manager = client.filtered_messages(MQTT_TOPIC)
messages = await stack.enter_async_context(manager)
task = asyncio.create_task(bed_command(ble, messages))
tasks.add(task)
# Subscribe to topic(s)
await client.subscribe(MQTT_TOPIC)
# let everyone know we are online
if DEBUG:
print("Going online")
await client.publish(MQTT_CHECKIN_TOPIC, MQTT_ONLINE_PAYLOAD, qos=1)
# let everyone know we are still alive
task = asyncio.create_task(
check_in(client, MQTT_CHECKIN_TOPIC, MQTT_CHECKIN_PAYLOAD)
)
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def check_in(client, topic, payload):
while True:
if DEBUG:
print(f'[topic="{topic}"] Publishing message={payload}')
await client.publish(topic, payload, qos=1)
await asyncio.sleep(300)
async def bed_command(ble, messages):
async for message in messages:
if DEBUG:
template = f'[topic_filter="{MQTT_TOPIC}"] {{}}'
print(template.format(message.payload.decode()))
ble.sendCommand(message.payload.decode())
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main():
ble_address = os.environ.get("BLE_ADDRESS", BED_ADDRESS)
if ble_address is None:
raise Exception("BLE_ADDRESS env not set")
if BED_TYPE == "serta":
ble = sertaBLEController(ble_address)
elif BED_TYPE == "jiecang":
ble = jiecangBLEController(ble_address)
elif BED_TYPE == "dewertokin":
ble = dewertokinBLEController(ble_address)
else:
raise Exception("Unrecognised bed type: " + str(BED_TYPE))
# Run the bed_loop indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
while True:
try:
await bed_loop(ble)
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())