Skip to content

Commit

Permalink
Merge pull request #23 from fledge-iot/FOGL-7691
Browse files Browse the repository at this point in the history
FOGL-7691 - Reuse the azure client in plugin send & other error handling fixes (Improvements)
  • Loading branch information
ashish-jabble authored May 26, 2023
2 parents 88f970a + ce4bd51 commit 4901ece
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 42 deletions.
126 changes: 87 additions & 39 deletions python/fledge/plugins/north/azure_iot/azure_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@


def plugin_info():
"""Used only once when call will be made to a plugin
Args:
Returns:
Information about the plugin including the configuration for the plugin
"""
return {
'name': 'Azure IoT Hub device client',
'version': '2.1.0',
Expand All @@ -66,40 +72,98 @@ def plugin_info():


def plugin_init(data):
"""Used for initialization of a plugin
Args:
data: Plugin configuration
Returns:
Dictionary of a Plugin configuration
"""
config_data = deepcopy(data)
config_data['azure_north'] = AzureNorthPlugin(config_data)
config_data['azure_iot_hub_device_client'] = AzureIoTHubDeviceClient(config_data)
config_data['max_retry_count'] = 10
return config_data


async def plugin_send(data, payload, stream_id):
async def plugin_send(handle, payload, stream_id):
"""Used to send the readings block to the configured destination
Args:
handle: An object which is returned by plugin_init
payload: A list of readings block
stream_id: An integer that uniquely identifies the connection from Fledge instance to the destination system
Returns:
Tuple which consists of
- A Boolean that indicates if any data has been sent
- The object id of the last reading which has been sent
- Total number of readings which has been sent to the configured destination
"""
try:
azure_north = data['azure_north']
is_data_sent, new_last_object_id, num_sent = await azure_north.send_payloads(payload)
azure_client = handle['azure_iot_hub_device_client']
is_data_sent = False
new_last_object_id = 0
num_sent = 0
if azure_client.client is None and handle['max_retry_count'] <= 10:
handle['max_retry_count'] += 1
await azure_client.connect()
if azure_client.client is not None:
is_data_sent, new_last_object_id, num_sent = await azure_client.send(payload)
except asyncio.CancelledError:
pass
except Exception as ex:
_LOGGER.exception(ex, "Failed to send data.")
except ValueError as err:
_LOGGER.error(err, "Bad Primary connection string to communicate with Azure IoT Hub.")
else:
return is_data_sent, new_last_object_id, num_sent


def plugin_shutdown(data):
pass
def plugin_shutdown(handle):
"""Used when plugin is no longer required and will be final call to the plugin
Args:
handle: An object which is returned by plugin_init
Returns:
None
"""
azure_client = handle['azure_iot_hub_device_client']
azure_client.shutdown()
handle['azure_iot_hub_device_client'] = None


def plugin_reconfigure():
pass


class AzureNorthPlugin(object):
"""North Azure Plugin"""
class AzureIoTHubDeviceClient(object):
"""A custom AzureIoTHubDeviceClient class"""

def __init__(self, config):
""""Initializer for AzureIoTHubDeviceClient
Args:
config: configuration data of plugin
"""
self.event_loop = asyncio.get_event_loop()
self.primary_connection_string = config["primaryConnectionString"]["value"]
self.mqtt_over_websocket = True if config["websockets"]["value"].lower() == 'true' else False
self.client = None

async def send(self, payloads):
"""Sends a message to the default events endpoint on the Azure IoT Hub instance.
Args:
payloads: A list of readings block
Returns:
Tuple which consists of
- A Boolean that indicates if any data has been sent
- The object id of the last reading which has been sent
- Total number of readings which has been sent to the configured destination
"""
async def _send_message(pb):
num_count = 0
if self.client.connected:
# separators used only to get minimum payload size to remove whitespaces around ',' ':'
payload = json.dumps(pb, separators=(',', ':')).encode('utf-8')
message = Message(data=payload, content_encoding="utf-8", content_type="application/json")
_LOGGER.debug("Sending message: {} and of size: {}".format(message, str(message.get_size())))
await self.client.send_message(message)
num_count += len(pb)
return num_count

async def send_payloads(self, payloads):
is_data_sent = False
last_object_id = 0
num_sent = 0
Expand All @@ -118,36 +182,20 @@ async def send_payloads(self, payloads):
break
last_object_id = p["id"]
payload_block.append(read)
num_sent = await self._send_payloads(payload_block)
num_sent = await _send_message(payload_block)
is_data_sent = True
except Exception as ex:
_LOGGER.exception(ex, 'Failed on sending payload.')
return is_data_sent, last_object_id, num_sent

async def _send_payloads(self, payload_block):
"""send a list of block payloads"""
num_count = 0
exception_raised = False
try:
device_client = IoTHubDeviceClient.create_from_connection_string(
self.primary_connection_string, websockets=self.mqtt_over_websocket)
# Connect the device client.
await device_client.connect()
# separators used only to get minimum payload size to remove whitespaces around ',' ':'
payload = json.dumps(payload_block, separators=(',', ':')).encode('utf-8')
message = Message(data=payload, content_encoding="utf-8", content_type="application/json")
_LOGGER.debug("Sending message: {} and of size: {}".format(message, str(message.get_size())))
await device_client.send_message(message)
except Exception as ex:
exception_raised = True
_LOGGER.exception(ex, 'Failed on sending a list of block payloads.')
else:
num_count += len(payload_block)
finally:
# prevent to not execute when exception before client connection
if not exception_raised:
# graceful exit
if device_client.connected:
await device_client.shutdown()
return num_count

async def connect(self):
"""Connects the client to an Azure IoT Hub instance."""
self.client = IoTHubDeviceClient.create_from_connection_string(
self.primary_connection_string, websockets=self.mqtt_over_websocket)
# Connect the device client.
await self.client.connect()

async def shutdown(self):
"""Disconnect the client from the Azure IoT Hub instance. Shut down the client for graceful exit."""
if self.client.connected:
await self.client.shutdown()
12 changes: 9 additions & 3 deletions tests/test_azure_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def test_plugin_info():


def test_plugin_init():
with patch.object(azure_iot, 'AzureNorthPlugin'):
with patch.object(azure_iot, 'AzureIoTHubDeviceClient'):
actual = azure_iot.plugin_init(config)
del actual['azure_north']
del actual['azure_iot_hub_device_client']
del actual['max_retry_count']
assert actual == config


Expand All @@ -51,4 +52,9 @@ def test_plugin_reconfigure():


def test_plugin_shutdown():
assert azure_iot.plugin_shutdown(azure_iot._DEFAULT_CONFIG) is None
handle = azure_iot._DEFAULT_CONFIG
handle['azure_iot_hub_device_client'] = azure_iot.AzureIoTHubDeviceClient
with patch.object(azure_iot.AzureIoTHubDeviceClient, 'shutdown') as patch_shutdown:
assert azure_iot.plugin_shutdown(handle) is None
assert handle['azure_iot_hub_device_client'] is None
patch_shutdown.assert_called_once_with()

0 comments on commit 4901ece

Please sign in to comment.