From 5c1f4876ce515e8e1aa9d847199fb0872dda4369 Mon Sep 17 00:00:00 2001 From: XdoctorwhoZ Date: Wed, 22 Nov 2023 07:31:57 +0100 Subject: [PATCH] dying_gasp ok --- .../core/monitored_event_loop.py | 39 +++- .../core/mqtt_async_client.py | 4 +- platform/panduza_platform/core/platform.py | 202 ++++++++++-------- .../panduza_platform/core/platform_driver.py | 60 +++--- .../panduza_platform/core/platform_worker.py | 48 ++++- .../devices/panduza/server/itf_platform.py | 11 +- platform/panduza_platform/meta_drivers/bpc.py | 6 +- 7 files changed, 235 insertions(+), 135 deletions(-) diff --git a/platform/panduza_platform/core/monitored_event_loop.py b/platform/panduza_platform/core/monitored_event_loop.py index e4d2652..649e730 100644 --- a/platform/panduza_platform/core/monitored_event_loop.py +++ b/platform/panduza_platform/core/monitored_event_loop.py @@ -1,4 +1,6 @@ import asyncio +import threading +import sys # --- @@ -8,14 +10,15 @@ class MonitoredEventLoop(asyncio.SelectorEventLoop): # --- - def __init__(self, log, *args, **kwargs): + def __init__(self, platform, *args, **kwargs): super().__init__(*args, **kwargs) self._total_time = 0 self._select_time = 0 self._before_select = None - self.log = log + self.platform = platform + self.log = self.platform.log self.perf_cycle_time = 2 # self.log.info(f"EVENT LOOP UP !!") @@ -26,8 +29,34 @@ def __init__(self, log, *args, **kwargs): def run_forever(self): self.ref_time = self.time() try: - # self.log.info(f"EVENT LOOP RUN") - super().run_forever() + self.log.info(f"EVENT LOOP RUN") + self._check_closed() + self._check_running() + self._set_coroutine_origin_tracking(self._debug) + self._thread_id = threading.get_ident() + + old_agen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, + finalizer=self._asyncgen_finalizer_hook) + try: + asyncio.events._set_running_loop(self) + + while True: + try: + # self.log.info(f"ONE") + self._run_once() + if self._stopping: + break + except KeyboardInterrupt: + self.log.warning("ctrl+c => user stop requested !!!!!!!!!!!! XD") + # self._stopping = True + self.platform.stop() + finally: + self._stopping = False + self._thread_id = None + asyncio.events._set_running_loop(None) + self._set_coroutine_origin_tracking(False) + sys.set_asyncgen_hooks(*old_agen_hooks) finally: finished = self.time() # self._total_time = finished - started @@ -62,4 +91,4 @@ def _process_events(self, *args, **kwargs): self._select_time = 0 self.ref_time = self.time() - # --- + # --- \ No newline at end of file diff --git a/platform/panduza_platform/core/mqtt_async_client.py b/platform/panduza_platform/core/mqtt_async_client.py index b2b2c1d..e68c90e 100644 --- a/platform/panduza_platform/core/mqtt_async_client.py +++ b/platform/panduza_platform/core/mqtt_async_client.py @@ -108,11 +108,11 @@ class MqttAsyncClient(PlatformWorker): # --- - def __init__(self, addr, port) -> None: + def __init__(self, platform, addr, port) -> None: """Constructor """ # Build parent - super().__init__() + super().__init__(platform) # Save address and port self.addr = addr diff --git a/platform/panduza_platform/core/platform.py b/platform/panduza_platform/core/platform.py index 8e62b83..bb7b6b4 100644 --- a/platform/panduza_platform/core/platform.py +++ b/platform/panduza_platform/core/platform.py @@ -32,7 +32,7 @@ from .platform_device_factory import PlatformDeviceFactory -STATUS_FILE_PATH="/etc/panduza/log/status.json" +# STATUS_FILE_PATH="/etc/panduza/log/status.json" class Platform: @@ -132,14 +132,23 @@ async def mount_client(self, name, host_addr, host_port): """Mount a mqtt client """ self.log.info(f"Mount client '{name}'") - mqtt_client = MqttAsyncClient(host_addr, host_port) + mqtt_client = MqttAsyncClient(self, host_addr, host_port) self.clients[name] = mqtt_client self.load_worker(mqtt_client) # --- - def unmount_client(self): - pass + async def unmount_client(self, name): + self.log.info(f"Unmount client '{name}'") + self.clients[name].stop() + self.clients.remove(name) + + # --- + + async def unmount_all_clients(self, force=False): + for client in self.clients: + if force or (not client.keep_mounted): + await self.unmount_client(client) # --- @@ -166,9 +175,9 @@ async def unmount_device(self, device): # --- - async def unmount_all_devices(self): + async def unmount_all_devices(self, force=False): for device in self.devices: - if not device.keep_mounted: + if force or (not device.keep_mounted): await self.unmount_device(device) # --- @@ -236,9 +245,11 @@ async def __idle_task(self): # Start the global task group async with asyncio.TaskGroup() as self.task_group: + self.log.warning("1") # Connect to primary broker await self.mount_client("primary", "localhost", 1883) + self.log.warning("2") # Mount the device interfaces of the server await self.mount_device("primary", "server", { @@ -248,6 +259,7 @@ async def __idle_task(self): keep_mounted=True ) + self.log.warning("3") # Task that load the config tree await self.__load_tree_task() @@ -255,37 +267,53 @@ async def __idle_task(self): while(self.alive): await asyncio.sleep(1) + + self.log.warning("END OF IDLE !") + self.event_loop.stop() + + # --- def __oper_mode(self): """Run the operational mode - - This mode start the main event loop then the initialisation task """ - try: - # Manage the status file (file to indicate the admin interface logs of the crash) - if os.path.isfile(STATUS_FILE_PATH): - os.remove(STATUS_FILE_PATH) - - # Start the loop and monitor activity - # If the debug flag is enabled, start monitored event loop - if self.event_loop_debug: - self.event_loop = MonitoredEventLoop(self.log) - asyncio.set_event_loop(self.event_loop) - with aiomonitor.start_monitor(self.event_loop): - self.event_loop.run_until_complete(self.__idle_task()) - else: - self.event_loop = asyncio.get_event_loop() - self.event_loop.run_until_complete(self.__idle_task()) + # Create the Monitored loop + self.event_loop = MonitoredEventLoop(self) + asyncio.set_event_loop(self.event_loop) + + # Create the idle task + self.event_loop.create_task(self.__idle_task(), name="IDLE") + + # + monitor = None + if self.event_loop_debug: + monitor = aiomonitor.Monitor(self.event_loop) + monitor.start() + + # try: + self.log.info("platform run") + self.event_loop.run_forever() + # finally: + # # loop.run_until_complete(loop.shutdown_asyncgens()) + # # loop.close() + # pass + + + # # + # if self.event_loop_debug: + # with aiomonitor.start_monitor(self.event_loop): + # self.event_loop.run_until_complete(self.__idle_task()) + # else: + # self.event_loop.run_until_complete(self.__idle_task()) - except InitializationError as e: - self.log.critical(f"Error during platform initialization: {e}") - self.generate_early_status_report(str(e)) - except KeyboardInterrupt: - self.log.warning("ctrl+c => user stop requested") - self.__stop() - except FileNotFoundError: - self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM") + # except InitializationError as e: + # self.log.critical(f"Error during platform initialization: {e}") + # self.generate_early_status_report(str(e)) + # except KeyboardInterrupt: + # self.log.warning("ctrl+c => user stop requested") + # self.stop() + # except FileNotFoundError: + # self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM") # --- @@ -315,74 +343,68 @@ async def __load_tree_task(self): # --- - def __stop(self): + def stop(self): """To stop the entire platform """ + # Stop alive flag self.alive = False # - self.log.warning("Platform stopping...") - for thr in self.threads: - thr.stop() - - # - for thr in self.threads: - thr.join() - - # Generate status reports - self.generate_status_reports() - - - - # --- - - def generate_early_status_report(self, error_string): - """Generate a status report when something went wrong during initialization - """ - status_obj = {} - - status_obj["final_state"] = "initialization" - status_obj["error_string"] = error_string - status_obj["threads"] = [] + self.event_loop.create_task(self.unmount_all_devices(force=True), name="CLEAR") + + # + self.event_loop.create_task(self.unmount_all_clients(force=True), name="CLEAR") - # Write the status file - with open(STATUS_FILE_PATH, "w") as json_file: - json.dump(status_obj, json_file) # --- - def generate_status_reports(self): - """Generate a json report status and log it to the console - """ - - status_obj = {} - status_obj["final_state"] = "running" - - # Gather the status of each thread - thread_status = [] - for thr in self.threads: - thread_status.append(thr.get_status()) - - # - status_obj["threads"] = thread_status - - # Write the status file - with open(STATUS_FILE_PATH, "w") as json_file: - json.dump(status_obj, json_file) - - # Print into the console - report = "\n" - for thr in thread_status: - report += "=================================\n" - report +=f"== {thr['name']} \n" - report += "=================================\n" - - for w in thr['workers']: - report += "\n" - report += str(w.get("name", "")) + "\n" - report += str(w.get("final_state", "")) + "\n" - report += str(w.get("error_string", "")) + "\n" - self.log.info(report) + # def generate_early_status_report(self, error_string): + # """Generate a status report when something went wrong during initialization + # """ + # status_obj = {} + + # status_obj["final_state"] = "initialization" + # status_obj["error_string"] = error_string + # status_obj["threads"] = [] + + # # Write the status file + # with open(STATUS_FILE_PATH, "w") as json_file: + # json.dump(status_obj, json_file) + + # # --- + + # def generate_status_reports(self): + # """Generate a json report status and log it to the console + # """ + + # status_obj = {} + # status_obj["final_state"] = "running" + + # # Gather the status of each thread + # thread_status = [] + # for thr in self.threads: + # thread_status.append(thr.get_status()) + + # # + # status_obj["threads"] = thread_status + + # # Write the status file + # with open(STATUS_FILE_PATH, "w") as json_file: + # json.dump(status_obj, json_file) + + # # Print into the console + # report = "\n" + # for thr in thread_status: + # report += "=================================\n" + # report +=f"== {thr['name']} \n" + # report += "=================================\n" + + # for w in thr['workers']: + # report += "\n" + # report += str(w.get("name", "")) + "\n" + # report += str(w.get("final_state", "")) + "\n" + # report += str(w.get("error_string", "")) + "\n" + # self.log.info(report) diff --git a/platform/panduza_platform/core/platform_driver.py b/platform/panduza_platform/core/platform_driver.py index f2e48be..50ee008 100644 --- a/platform/panduza_platform/core/platform_driver.py +++ b/platform/panduza_platform/core/platform_driver.py @@ -35,7 +35,6 @@ def __init__(self, name=None) -> None: self.interface_name = name - self.platform = None self.pclient = None self.__err_string = "" @@ -89,9 +88,6 @@ def initialize(self, device, group_name): self.device_name = self.device.get_name() self.group_name = group_name - # Store tasks managed by this interface - self._tasks = [] - # Flag to know if the topics have been subscribed self.__topics_subscribed = False @@ -241,6 +237,13 @@ async def PZA_WORKER_task(self): except Exception as e: self._PZA_DRV_error_detected(str(e) + " " + traceback.format_exc()) + # --- + + async def PZA_WORKER_dying_gasp(self): + """Just transfer the call + """ + await self._PZA_DRV_dying_gasp() + # ============================================================================= # INTERNAL STATES FUNCTIONS @@ -270,7 +273,7 @@ async def __drv_state_connecting(self): async def __drv_state_init(self): """ """ - self.load_interface_task(self.__alive_task()) + self.load_worker_task(self.__alive_task()) self.__subscribe_topics() await self._PZA_DRV_loop_init() @@ -487,13 +490,12 @@ def get_interface_instance_from_name(self, name): return self.platform.get_interface_instance_from_name(name) - ########################################################################### - ########################################################################### - # + + + # ============================================================================= # TO OVERRIDE IN SUBCLASS - # - ########################################################################### - ########################################################################### + + # --- @abc.abstractmethod def _PZA_DRV_config(self): @@ -501,26 +503,36 @@ def _PZA_DRV_config(self): """ pass + # --- + def _PZA_DRV_tree_template(self): """ """ return {} + # --- + def _PZA_DRV_hunt_instances(self): """ """ return [] + # --- + async def _PZA_DRV_loop_init(self): """ """ self._PZA_DRV_init_success() + # --- + async def _PZA_DRV_loop_run(self): """ """ await asyncio.sleep(0.1) + # --- + async def _PZA_DRV_loop_err(self): """ """ @@ -531,14 +543,21 @@ async def _PZA_DRV_loop_err(self): await asyncio.sleep(1) self.log.debug(f"restart in { int(PlatformDriver.ERROR_TIME_BEFORE_RETRY_S - elasped) }s") + # --- + async def _PZA_DRV_cmds_set(self, payload): """Must apply the command on the driver """ pass + # --- + + async def _PZA_DRV_dying_gasp(self): + """Provide a way for the interface to execute a last action before stopping + """ + pass - - + # --- @@ -556,21 +575,6 @@ async def _PZA_DRV_cmds_set(self, payload): # --- - def load_interface_task(self, coro, name = None): - if name == None: - name=f"FROM>{self.PZA_WORKER_name()}" - new_task = self.platform.load_task(coro, name) - self._tasks.append(new_task) - return new_task - - # --- - - def cancel_all_tasks(self): - for t in self._tasks: - t.cancel() - - # --- - def _PZA_DRV_connection_success(self): self.__drv_state = "init" diff --git a/platform/panduza_platform/core/platform_worker.py b/platform/panduza_platform/core/platform_worker.py index a9e4e0f..52490c8 100644 --- a/platform/panduza_platform/core/platform_worker.py +++ b/platform/panduza_platform/core/platform_worker.py @@ -2,15 +2,43 @@ import asyncio class PlatformWorker(metaclass=abc.ABCMeta): - """Mother class for all the python drivers + """Mother class for all platform objects """ - def __init__(self) -> None: + # --- + + def __init__(self, platform = None) -> None: + """Constructor + """ + # Alive flag for the work (must stop if == False) self.alive = True - self.reset_work_time() - def reset_work_time(self): - self.work_time = 0 + # Store platform access + self.platform = platform + + # Task managed by this worker + self._subTasks = [] + + # --- + + def stop(self): + self.cancel_all_tasks() + self.alive = False + + # --- + + def load_worker_task(self, coro, name = None): + if name == None: + name=f"FROM>{self.PZA_WORKER_name()}" + new_task = self.platform.load_task(coro, name) + self._subTasks.append(new_task) + return new_task + + # --- + + def cancel_all_tasks(self): + for t in self._subTasks: + t.cancel() # --- @@ -20,12 +48,13 @@ async def worker_panic(self): # --- async def task(self): - """ + """Main task loop """ while(self.alive): await asyncio.sleep(0.1) await self.PZA_WORKER_task() + await self.PZA_WORKER_dying_gasp() self.PZA_WORKER_log().info("stopped") # ============================================================================= @@ -64,4 +93,11 @@ async def PZA_WORKER_task(self): """ pass + # --- + async def PZA_WORKER_dying_gasp(self): + """Provide a way for the worker to execute a last action before stopping + """ + pass + + # --- diff --git a/platform/panduza_platform/devices/panduza/server/itf_platform.py b/platform/panduza_platform/devices/panduza/server/itf_platform.py index 87d40b5..8b7209c 100644 --- a/platform/panduza_platform/devices/panduza/server/itf_platform.py +++ b/platform/panduza_platform/devices/panduza/server/itf_platform.py @@ -29,13 +29,16 @@ async def _PZA_DRV_loop_init(self): } # Append a task to refresh platform data - self.load_interface_task(self.__refresh_platform_data_task()) + self.load_worker_task(self.__refresh_platform_data_task()) # status # state => string # report => json + await self._update_attribute("info", "dying_gasp", False) + + # devices # Tell the platform that the init state end sucessfuly @@ -51,6 +54,12 @@ async def _PZA_DRV_cmds_set(self, payload): if att in cmds: await self.__cmd_handlers[att](cmds[att]) + # --- + + async def _PZA_DRV_dying_gasp(self): + # [REQ_ITF_PLATFORM_0020_00] - Info 'dying_gasp' field + await self._update_attribute("info", "dying_gasp", True) + # --- async def __refresh_platform_data_task(self): diff --git a/platform/panduza_platform/meta_drivers/bpc.py b/platform/panduza_platform/meta_drivers/bpc.py index bb75d82..596c2cc 100644 --- a/platform/panduza_platform/meta_drivers/bpc.py +++ b/platform/panduza_platform/meta_drivers/bpc.py @@ -112,9 +112,9 @@ async def _PZA_DRV_loop_init(self): } # Start polling task - self.load_interface_task(self.__polling_task_att_enable()) - self.load_interface_task(self.__polling_task_att_voltage()) - self.load_interface_task(self.__polling_task_att_current()) + self.load_worker_task(self.__polling_task_att_enable()) + self.load_worker_task(self.__polling_task_att_voltage()) + self.load_worker_task(self.__polling_task_att_current()) # Init success, the driver can pass into the run mode self._PZA_DRV_init_success()