From 488a72c552ed88d30a9efa6f7168b4f1c1b53716 Mon Sep 17 00:00:00 2001 From: ppescher Date: Fri, 25 Oct 2019 11:52:19 +0200 Subject: [PATCH] Update polaris and cloud libraries --- fortebit/iot/http_client.py | 15 +++++++++- fortebit/iot/iot.py | 32 ++++++++++----------- fortebit/iot/mqtt_client.py | 41 ++++++++++++++++++++++----- fortebit/polaris/cloud.py | 55 ++++++++++++++++++++++++------------- fortebit/polaris/polaris.py | 4 +-- 5 files changed, 102 insertions(+), 45 deletions(-) diff --git a/fortebit/iot/http_client.py b/fortebit/iot/http_client.py index 6186b87..394e7db 100644 --- a/fortebit/iot/http_client.py +++ b/fortebit/iot/http_client.py @@ -131,7 +131,7 @@ def _get_rpc_request(self, timeout=20000): self.connected = False return None - def publish_rpc_reply(self, id, result): + def send_rpc_reply(self, id, result): try: res = requests.post(self._rpc_url(id), json=result, ctx=self.ctx) except Exception as e: @@ -140,3 +140,16 @@ def publish_rpc_reply(self, id, result): if res.status != 200: return False return True + + def do_rpc_request(self, method, params=None, timeout=20000): + print_d("rpc request:", method, params) + try: + obj = { "method":method, "params":params } + res = requests.post(self._rpc_url(), json=obj, ctx=self.ctx) + if res.status != 200: + return None + obj = json.loads(res.content) + except Exception as e: + print_d(e) + return None + return obj diff --git a/fortebit/iot/iot.py b/fortebit/iot/iot.py index 973c60e..cde6b2d 100644 --- a/fortebit/iot/iot.py +++ b/fortebit/iot/iot.py @@ -56,21 +56,10 @@ def connect(self, retry=7): """ .. method:: connect() - Setup a connection to the Fortebit Cloud. It can raise an exception in case of error. - - """ - if self.client.connect(): - return True - t = 3000 - while (retry > 0): - sleep(t) - if self.client.connect(): - return True - # increase (double) the delay between attempts (up to 1 minute) - if t < 30000: - t *= 2 - retry -= 1 - return False + Setup a connection to the Fortebit Cloud. Return *True* if successful. + + """ + return self.client.connect() def is_connected(self): """ @@ -85,7 +74,7 @@ def run(self): """ .. method:: run() - Starts the device by executing the underlying client. It can start a new thread depending on the type of client (Mqtt vs Http) + Starts the device by executing the underlying client loop. """ self.client.loop() @@ -129,3 +118,14 @@ def send_rpc_reply(self, id, result): Return a boolean, *False* if the message cannot be sent. """ return self.client.send_rpc_reply(id, result) + + def do_rpc_request(self, method, params=None, timeout=15000): + """ +.. method:: do_rpc_request(method, params, timeout) + + Perform an RPC request with name :samp:`method` and arguments :samp:`params`, waiting for + a reply maximum :samp:`timeout` milliseconds (only with MqttClient). + + Return the result of the RPC (dictionary), *None* in case of errors. + """ + return self.client.do_rpc_request(method, params, timeout) diff --git a/fortebit/iot/mqtt_client.py b/fortebit/iot/mqtt_client.py index a90c25e..53f2d44 100644 --- a/fortebit/iot/mqtt_client.py +++ b/fortebit/iot/mqtt_client.py @@ -17,12 +17,14 @@ class MqttClient(): def __init__(self, endpoint, device_token, ctx=None): self.endpoint = endpoint self.ctx = ctx - self.request_id = -1 - self.rpc_id = 10000 self.driver = mqtt.Client("polaris", True) self.driver.set_username_pw(device_token) + self.attr_id = -1 self.attr_ev = threading.Event() self.attr_obj = None + self.rpc_id = 10000 + self.rpc_ev = threading.Event() + self.rpc_obj = None def _loop_failure(self, client): while True: @@ -39,6 +41,8 @@ def _subscribe_cb(self, client): # subscribe to attributes client.subscribe("v1/devices/me/attributes/response/+", self._on_attributes, 1) client.subscribe("v1/devices/me/rpc/request/+", self._on_rpc_request, 1) + client.subscribe("v1/devices/me/rpc/response/+", self._on_rpc_response, 1) + def connect(self): port = 1883 if self.ctx is None else 8883 @@ -60,7 +64,7 @@ def _on_attributes(self, client, payload, topic): print_d("> got:", topic, payload) # len("v1/devices/me/attributes/response/") = 34 id = int(topic[34:]) - if self.request_id == id: + if self.attr_id == id: self.attr_obj = json.loads(payload) self.attr_ev.set() @@ -75,7 +79,7 @@ def _on_rpc_request(self, client, payload, topic): self.on_rpc_request(id, obj['method'], obj['params'] if 'params' in obj else None) def get_attributes(self, client, shared=None, timeout=10000): - self.request_id += 1 + self.attr_id += 1 self.attr_ev.clear() obj = {} if client and isinstance(client, PSTRING): @@ -87,7 +91,7 @@ def get_attributes(self, client, shared=None, timeout=10000): else: shared = None obj = json.dumps(obj) - ep = 'v1/devices/me/attributes/request/' + str(self.request_id) + ep = 'v1/devices/me/attributes/request/' + str(self.attr_id) print_d("publish:", ep, obj) try: self.driver.publish(ep, obj, qos=1) @@ -121,7 +125,7 @@ def publish_telemetry(self, values, ts=None): if ts is not None: values = {'values': values, 'ts': ts} try: - self.driver.publish("v1/devices/me/telemetry", json.dumps(values), qos=1) + self.driver.publish('v1/devices/me/telemetry', json.dumps(values), qos=1) except Exception as e: print_d(e) return False @@ -130,8 +134,31 @@ def publish_telemetry(self, values, ts=None): def send_rpc_reply(self, id, result): print_d("rpc reply", id, result) try: - self.driver.publish("v1/devices/me/rpc/response/" + str(id), json.dumps(result), qos=1) + self.driver.publish('v1/devices/me/rpc/response/' + str(id), json.dumps(result), qos=1) except Exception as e: print_d(e) return False return True + + def _on_rpc_response(self, client, payload, topic): + print_d("> got:", topic, payload) + # len("v1/devices/me/rpc/response/") = 27 + id = int(topic[27:]) + if self.rpc_id == id: + self.rpc_obj = json.loads(payload) + self.rpc_ev.set() + + def do_rpc_request(self, method, params=None, timeout=15000): + self.rpc_id += 1 + self.rpc_ev.clear() + print_d("rpc request:", self.rpc_id, method, params) + try: + msg = { 'method': method, 'params': params } + self.driver.publish('v1/devices/me/rpc/request/' + str(self.rpc_id), json.dumps(msg), qos=1) + obj = None + self.rpc_ev.wait(timeout) + obj = self.rpc_obj + except Exception as e: + print_d(e) + return None + return obj diff --git a/fortebit/polaris/cloud.py b/fortebit/polaris/cloud.py index 12a9f57..49ebd2a 100644 --- a/fortebit/polaris/cloud.py +++ b/fortebit/polaris/cloud.py @@ -63,29 +63,46 @@ def isRegistered(device, email): return False -def register(device, email, imei, ssl_ctx=None): +def register(device, email): """ -.. function:: register(device, email, imei, ssl_ctx) +.. function:: register(device, email) - Generates the board's own access token for Fortebit IoT cloud services. + Perform device registration to Fortebit IoT cloud services. :param device: a connected instance of :any:`fortebit.iot.Device` :param email: the device owner's email address - :param imei: the modem IMEI number (as a 15 characters string) - :param ssl_ctx: an optional SSL/TLS context (use HTTPS if present) """ - if ssl_ctx: - url = "https://" - else: - url = "http://" - url += device.endpoint + "/script-polaris/register" - obj = {"email": email, "token": device.device_token, "imei": imei} - print("Register device", email, device.device_token, url, obj) - try: - res = requests.get(url, params=obj, ctx=ssl_ctx) - if res.status == 200: - return True - print("Registration error", res.status) - except Exception as e: - print(e) + obj = {"email": email, "token": device.device_token} + print("Register device", obj) + obj = device.do_rpc_request("register", obj) + #print(obj) + if obj and "reply" in obj and obj["reply"] == "OK": + return True return False + +# def register(device, email, imei, ssl_ctx=None): +# """ +# .. function:: register(device, email, imei, ssl_ctx) + +# Perform device registration to Fortebit IoT cloud services. + +# :param device: a connected instance of :any:`fortebit.iot.Device` +# :param email: the device owner's email address +# :param imei: the modem IMEI number (as a 15 characters string) +# :param ssl_ctx: an optional SSL/TLS context (use HTTPS if present) +# """ +# if ssl_ctx: +# url = "https://" +# else: +# url = "http://" +# url += device.endpoint + "/script-polaris/register" +# obj = {"email": email, "token": device.device_token, "imei": imei} +# print("Register device", email, device.device_token, url, obj) +# try: +# res = requests.get(url, params=obj, ctx=ssl_ctx) +# if res.status == 200: +# return True +# print("Registration error", res.status) +# except Exception as e: +# print(e) +# return False diff --git a/fortebit/polaris/polaris.py b/fortebit/polaris/polaris.py index 8c1acad..a88acad 100644 --- a/fortebit/polaris/polaris.py +++ b/fortebit/polaris/polaris.py @@ -347,7 +347,7 @@ def GSM(): """ #-if TARGET == polaris_3g from quectel.ug96 import ug96 - ug96.init(gsm.SERIAL, _PIN_NC, _PIN_NC, gsm.PIN_POWER, _PIN_NC, gsm.PIN_STATUS, gsm.PIN_KILL, 0) + ug96.init(gsm.SERIAL, _PIN_NC, _PIN_NC, gsm.PIN_POWER, gsm.PIN_KILL, gsm.PIN_STATUS, 1, 1, 0) return ug96 #-else ##-if TARGET == polaris_2g @@ -357,7 +357,7 @@ def GSM(): ##-else ###-if TARGET == polaris_nbiot from quectel.bg96 import bg96 - bg96.init(gsm.SERIAL, _PIN_NC, _PIN_NC, gsm.PIN_POWER, gsm.PIN_KILL, gsm.PIN_STATUS, 0) + bg96.init(gsm.SERIAL, _PIN_NC, _PIN_NC, gsm.PIN_POWER, gsm.PIN_KILL, gsm.PIN_STATUS, 1, 1, 0) bg96.gnss_init(use_uart=1) return bg96 ###-else