-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCommunication.py
141 lines (114 loc) · 5.78 KB
/
Communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Wrapper around paho mqtt using configparser for an easy configurable communication
# Author: Daniel Habering <daniel@habering.de>
import configparser
import json
from paho.mqtt import client as mqtt_client
class Comm:
client = None
connected = False
topics = {}
subscriptions = {}
# Returns current connection status to broker
def is_connected(self):
return self.connected
# Callback for mqtt, when client hast connected to broker
def on_connect(self, client, userdata, flags, rc):
self.connected = True
# Callback for mqtt, when connection to broker has been lost
# Reset connected flag
def on_disconnect(self, userdata, rc):
self.connected = False
print("Connection to broker failed. Errorcode: " + str(rc))
# Callback for mqtt, when any message has been received
def on_message(self, client, userdata, msg):
# Check if callback is available for received topic
if msg.topic not in self.subscriptions:
return
# Message should be a valid json string, try to parse to dict
try:
msg_dict = json.loads(str(msg.payload.decode()))
except json.decoder.JSONDecodeError as decode_error:
print("Received invalid JSON string: " + str(msg.payload) + " with error " + str(decode_error))
return
# Pass dict to stored callback function
self.subscriptions[msg.topic]["callback"](msg_dict)
# Callback for mqtt, when subscription to a topic has been successfull
def on_subscribe(self, client, userdata, mid, granted_qos):
# Check for a pending subscription for this topic
for topic, subscription in self.subscriptions.items():
if subscription["mid"] == mid:
# Update subscription status
subscription["subscribed"] = True
# To be called if a new subscription should be created. Upon successfull subscription, the passed callback will be called
# for any message recieved on said topic
# The provided topicId has to match an entry in the provided communication config
def subscribe(self, topicId, callback):
# Check if topicID is known from communication config
if topicId not in self.topics:
raise Exception("Topic " + str(topicId) + " can not be found under [Topics] in the provided config file")
topic_string = self.topics[topicId]
# Check if client is connected to broker
if not self.connected:
raise Exception("MQTT is not connected to broker, failed to subscribe to " + str(topic_string))
# Subscribe to topic
(result, mid) = self.client.subscribe(topic_string)
if result != mqtt_client.MQTT_ERR_SUCCESS:
raise Exception("Failed to subscribe to " + str(topic_string) + " with error " + str(result))
# Store callback and subscription mid
self.subscriptions[topic_string] = {"callback": callback, "mid": mid, "subscribed": False}
# To be called to publish a msg to a given topicID
# The provided topicId has to match an entry in the provided communication config
# The msg has to be a valid JSON string or a dictionary
def publish(self, topicId, msg):
# Check if topic is known
if topicId not in self.topics:
raise Exception("Topic " + str(topicId) + " can not be found under [Topics] in the provided config file")
topic_string = self.topics[topicId]
# Check if client is connected to broker
if not self.connected:
raise Exception("MQTT is not connected to broker, failed to publish to topic")
# Create json string from msg
msg_string = ""
if type(msg) is dict:
# Create json string from dict
msg_string = json.dumps(msg)
elif type(msg) is str:
# Check if msg is a valid json string
try:
json.loads(msg)
except ValueError as error:
print("Can only publish valid json strings: " + str(error))
return
msg_string = msg
else:
raise Exception("Comm.publish only takes dict or str")
# Publish message on topic
self.client.publish(topic_string, msg_string)
# Read Broker config from config file
def readBrokerConfigField(self, config, field):
result = config["Broker"][field]
if not result:
raise Exception("Config file needs to contain a " + str(field) + " field under section [Broker]")
return result
def __init__(self, client_id, configfile = "cfg/comm.cfg"):
config = configparser.ConfigParser()
config.read(configfile)
broker_ip = str(self.readBrokerConfigField(config, "ip"))
broker_port = int(self.readBrokerConfigField(config, "port"))
broker_username = str(self.readBrokerConfigField(config, "username"))
broker_pw = str(self.readBrokerConfigField(config, "password"))
if "Topics" not in config:
raise Exception("Config file needs collection of topics under a section called [Topics]")
for topic_id in config["Topics"]:
self.topics[topic_id] = config["Topics"][topic_id]
self.client = mqtt_client.Client(client_id)
self.client.username_pw_set(broker_username, broker_pw)
self.client.on_connect = self.on_connect
#self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_subscribe = self.on_subscribe
try:
self.client.connect(broker_ip, broker_port)
except ConnectionRefusedError as err:
raise Exception("Failed to connect to broker. Check that broker is running under " + str(broker_ip) + str(broker_port) + " and that username and pw are correct")
self.client.loop_start()