Skip to content

Commit

Permalink
dying_gasp ok
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Nov 22, 2023
1 parent a1ef14f commit 5c1f487
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 135 deletions.
39 changes: 34 additions & 5 deletions platform/panduza_platform/core/monitored_event_loop.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import threading
import sys

# ---

Expand All @@ -8,14 +10,15 @@ class MonitoredEventLoop(asyncio.SelectorEventLoop):

# ---

def __init__(self, log, *args, **kwargs):
def __init__(self, platform, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_time = 0
self._select_time = 0

self._before_select = None

self.log = log
self.platform = platform
self.log = self.platform.log
self.perf_cycle_time = 2

# self.log.info(f"EVENT LOOP UP !!")
Expand All @@ -26,8 +29,34 @@ def __init__(self, log, *args, **kwargs):
def run_forever(self):
self.ref_time = self.time()
try:
# self.log.info(f"EVENT LOOP RUN")
super().run_forever()
self.log.info(f"EVENT LOOP RUN")
self._check_closed()
self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()

old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
asyncio.events._set_running_loop(self)

while True:
try:
# self.log.info(f"ONE")
self._run_once()
if self._stopping:
break
except KeyboardInterrupt:
self.log.warning("ctrl+c => user stop requested !!!!!!!!!!!! XD")
# self._stopping = True
self.platform.stop()
finally:
self._stopping = False
self._thread_id = None
asyncio.events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
finally:
finished = self.time()
# self._total_time = finished - started
Expand Down Expand Up @@ -62,4 +91,4 @@ def _process_events(self, *args, **kwargs):
self._select_time = 0
self.ref_time = self.time()

# ---
# ---
4 changes: 2 additions & 2 deletions platform/panduza_platform/core/mqtt_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ class MqttAsyncClient(PlatformWorker):

# ---

def __init__(self, addr, port) -> None:
def __init__(self, platform, addr, port) -> None:
"""Constructor
"""
# Build parent
super().__init__()
super().__init__(platform)

# Save address and port
self.addr = addr
Expand Down
202 changes: 112 additions & 90 deletions platform/panduza_platform/core/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .platform_device_factory import PlatformDeviceFactory


STATUS_FILE_PATH="/etc/panduza/log/status.json"
# STATUS_FILE_PATH="/etc/panduza/log/status.json"


class Platform:
Expand Down Expand Up @@ -132,14 +132,23 @@ async def mount_client(self, name, host_addr, host_port):
"""Mount a mqtt client
"""
self.log.info(f"Mount client '{name}'")
mqtt_client = MqttAsyncClient(host_addr, host_port)
mqtt_client = MqttAsyncClient(self, host_addr, host_port)
self.clients[name] = mqtt_client
self.load_worker(mqtt_client)

# ---

def unmount_client(self):
pass
async def unmount_client(self, name):
self.log.info(f"Unmount client '{name}'")
self.clients[name].stop()
self.clients.remove(name)

# ---

async def unmount_all_clients(self, force=False):
for client in self.clients:
if force or (not client.keep_mounted):
await self.unmount_client(client)

# ---

Expand All @@ -166,9 +175,9 @@ async def unmount_device(self, device):

# ---

async def unmount_all_devices(self):
async def unmount_all_devices(self, force=False):
for device in self.devices:
if not device.keep_mounted:
if force or (not device.keep_mounted):
await self.unmount_device(device)

# ---
Expand Down Expand Up @@ -236,9 +245,11 @@ async def __idle_task(self):
# Start the global task group
async with asyncio.TaskGroup() as self.task_group:

self.log.warning("1")
# Connect to primary broker
await self.mount_client("primary", "localhost", 1883)

self.log.warning("2")
# Mount the device interfaces of the server
await self.mount_device("primary", "server",
{
Expand All @@ -248,44 +259,61 @@ async def __idle_task(self):
keep_mounted=True
)

self.log.warning("3")
# Task that load the config tree
await self.__load_tree_task()

# Wait for ever
while(self.alive):
await asyncio.sleep(1)


self.log.warning("END OF IDLE !")
self.event_loop.stop()


# ---

def __oper_mode(self):
"""Run the operational mode
This mode start the main event loop then the initialisation task
"""
try:
# Manage the status file (file to indicate the admin interface logs of the crash)
if os.path.isfile(STATUS_FILE_PATH):
os.remove(STATUS_FILE_PATH)

# Start the loop and monitor activity
# If the debug flag is enabled, start monitored event loop
if self.event_loop_debug:
self.event_loop = MonitoredEventLoop(self.log)
asyncio.set_event_loop(self.event_loop)
with aiomonitor.start_monitor(self.event_loop):
self.event_loop.run_until_complete(self.__idle_task())
else:
self.event_loop = asyncio.get_event_loop()
self.event_loop.run_until_complete(self.__idle_task())
# Create the Monitored loop
self.event_loop = MonitoredEventLoop(self)
asyncio.set_event_loop(self.event_loop)

# Create the idle task
self.event_loop.create_task(self.__idle_task(), name="IDLE")

#
monitor = None
if self.event_loop_debug:
monitor = aiomonitor.Monitor(self.event_loop)
monitor.start()

# try:
self.log.info("platform run")
self.event_loop.run_forever()
# finally:
# # loop.run_until_complete(loop.shutdown_asyncgens())
# # loop.close()
# pass


# #
# if self.event_loop_debug:
# with aiomonitor.start_monitor(self.event_loop):
# self.event_loop.run_until_complete(self.__idle_task())
# else:
# self.event_loop.run_until_complete(self.__idle_task())

except InitializationError as e:
self.log.critical(f"Error during platform initialization: {e}")
self.generate_early_status_report(str(e))
except KeyboardInterrupt:
self.log.warning("ctrl+c => user stop requested")
self.__stop()
except FileNotFoundError:
self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM")
# except InitializationError as e:
# self.log.critical(f"Error during platform initialization: {e}")
# self.generate_early_status_report(str(e))
# except KeyboardInterrupt:
# self.log.warning("ctrl+c => user stop requested")
# self.stop()
# except FileNotFoundError:
# self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM")

# ---

Expand Down Expand Up @@ -315,74 +343,68 @@ async def __load_tree_task(self):

# ---

def __stop(self):
def stop(self):
"""To stop the entire platform
"""
# Stop alive flag
self.alive = False

#
self.log.warning("Platform stopping...")
for thr in self.threads:
thr.stop()

#
for thr in self.threads:
thr.join()

# Generate status reports
self.generate_status_reports()



# ---

def generate_early_status_report(self, error_string):
"""Generate a status report when something went wrong during initialization
"""
status_obj = {}

status_obj["final_state"] = "initialization"
status_obj["error_string"] = error_string
status_obj["threads"] = []
self.event_loop.create_task(self.unmount_all_devices(force=True), name="CLEAR")

#
self.event_loop.create_task(self.unmount_all_clients(force=True), name="CLEAR")

# Write the status file
with open(STATUS_FILE_PATH, "w") as json_file:
json.dump(status_obj, json_file)

# ---

def generate_status_reports(self):
"""Generate a json report status and log it to the console
"""

status_obj = {}
status_obj["final_state"] = "running"

# Gather the status of each thread
thread_status = []
for thr in self.threads:
thread_status.append(thr.get_status())

#
status_obj["threads"] = thread_status

# Write the status file
with open(STATUS_FILE_PATH, "w") as json_file:
json.dump(status_obj, json_file)

# Print into the console
report = "\n"
for thr in thread_status:
report += "=================================\n"
report +=f"== {thr['name']} \n"
report += "=================================\n"

for w in thr['workers']:
report += "\n"
report += str(w.get("name", "")) + "\n"
report += str(w.get("final_state", "")) + "\n"
report += str(w.get("error_string", "")) + "\n"
self.log.info(report)
# def generate_early_status_report(self, error_string):
# """Generate a status report when something went wrong during initialization
# """
# status_obj = {}

# status_obj["final_state"] = "initialization"
# status_obj["error_string"] = error_string
# status_obj["threads"] = []

# # Write the status file
# with open(STATUS_FILE_PATH, "w") as json_file:
# json.dump(status_obj, json_file)

# # ---

# def generate_status_reports(self):
# """Generate a json report status and log it to the console
# """

# status_obj = {}
# status_obj["final_state"] = "running"

# # Gather the status of each thread
# thread_status = []
# for thr in self.threads:
# thread_status.append(thr.get_status())

# #
# status_obj["threads"] = thread_status

# # Write the status file
# with open(STATUS_FILE_PATH, "w") as json_file:
# json.dump(status_obj, json_file)

# # Print into the console
# report = "\n"
# for thr in thread_status:
# report += "=================================\n"
# report +=f"== {thr['name']} \n"
# report += "=================================\n"

# for w in thr['workers']:
# report += "\n"
# report += str(w.get("name", "")) + "\n"
# report += str(w.get("final_state", "")) + "\n"
# report += str(w.get("error_string", "")) + "\n"
# self.log.info(report)



Expand Down
Loading

0 comments on commit 5c1f487

Please sign in to comment.