diff --git a/jarvis/api/main.py b/jarvis/api/main.py index a8f58ed9..b76feb72 100644 --- a/jarvis/api/main.py +++ b/jarvis/api/main.py @@ -2,7 +2,6 @@ import pathlib import shutil from datetime import datetime -from multiprocessing import Process from threading import Thread from fastapi import FastAPI @@ -12,7 +11,7 @@ from jarvis.api import routers from jarvis.api.logger import logger from jarvis.api.squire import discover, stockanalysis_squire -from jarvis.executors import crontab +from jarvis.executors import crontab, resource_tracker from jarvis.modules.models import models # Initiate API @@ -89,9 +88,9 @@ def startup() -> None: log_file = datetime.now().strftime( os.path.join("logs", "startup_script_%d-%m-%Y.log") ) - Process( - target=crontab.executor, args=(script, log_file, "startup_script") - ).start() + resource_tracker.semaphores( + crontab.executor, (script, log_file, "startup_script") + ) else: logger.warning( "Unsupported file format for startup script: %s", startup_script diff --git a/jarvis/api/routers/surveillance.py b/jarvis/api/routers/surveillance.py index 83e5348f..18dc5b47 100644 --- a/jarvis/api/routers/surveillance.py +++ b/jarvis/api/routers/surveillance.py @@ -209,7 +209,9 @@ async def video_feed(request: Request, token: str = None): detail="Requires authentication since endpoint uses single-use token.", ) settings.surveillance.token = None - settings.surveillance.queue_manager[settings.surveillance.client_id] = Queue() + settings.surveillance.queue_manager[settings.surveillance.client_id] = Queue( + maxsize=10 + ) process = Process( target=surveillance_squire.gen_frames, kwargs={ diff --git a/jarvis/executors/commander.py b/jarvis/executors/commander.py index aaeb7928..cdc95241 100644 --- a/jarvis/executors/commander.py +++ b/jarvis/executors/commander.py @@ -1,7 +1,6 @@ import random import time import traceback -from multiprocessing import Process from threading import Thread from typing import Tuple @@ -11,6 +10,7 @@ listener_controls, offline, others, + resource_tracker, word_match, ) from jarvis.modules.audio import speaker @@ -85,9 +85,9 @@ def timed_delay(phrase: str) -> Tuple[str, int | float]: split_ = phrase.split("after") if task := split_[0].strip(): delay = util.delay_calculator(phrase=split_[1].strip()) - Process( - target=delay_condition, kwargs={"phrase": task, "delay": delay} - ).start() + resource_tracker.semaphores( + delay_condition, kwargs={"phrase": task, "delay": delay} + ) return task, delay diff --git a/jarvis/executors/offline.py b/jarvis/executors/offline.py index 10934229..6f6a014c 100644 --- a/jarvis/executors/offline.py +++ b/jarvis/executors/offline.py @@ -23,6 +23,7 @@ listener_controls, others, remind, + resource_tracker, weather_monitor, word_match, ) @@ -120,7 +121,7 @@ def background_task_runner() -> None: if "weather" in exec_task.lower(): # run as daemon and not store in children table as this won't take long logger.debug("Initiating weather alert monitor") - Process(target=weather_monitor.monitor, daemon=True).start() + resource_tracker.semaphores(weather_monitor.monitor) else: logger.debug("Executing: '%s'", exec_task) try: @@ -222,7 +223,7 @@ def background_task_runner() -> None: "day", datetime.now().strftime("%A") ) == datetime.now().strftime("%A"): logger.info("Executing alarm: %s", alarmer) - Process(target=alarm.executor).start() + resource_tracker.semaphores(alarm.executor) if not alarmer["repeat"]: copied_alarms.remove(alarmer) if copied_alarms != alarms: diff --git a/jarvis/executors/processor.py b/jarvis/executors/processor.py index 7712209c..4c00386f 100644 --- a/jarvis/executors/processor.py +++ b/jarvis/executors/processor.py @@ -2,8 +2,6 @@ from multiprocessing import Process from typing import Dict, List -import psutil - from jarvis.executors import process_map from jarvis.modules.database import database from jarvis.modules.logger import logger @@ -121,14 +119,8 @@ def stop_child_processes() -> None: logger.info(children) for category, pids in children.items(): for pid in pids: - try: - proc = psutil.Process(pid=pid) - except psutil.NoSuchProcess: - # Occurs commonly since child processes run only for a short time and `INSERT or REPLACE` leaves dupes - logger.debug("Process [%s] PID not found %d", category, pid) - continue logger.info("Stopping process [%s] with PID: %d", category, pid) - support.stop_process(pid=proc.pid) + support.stop_process(pid=pid) def stop_processes(func_name: str = None) -> None: diff --git a/jarvis/executors/resource_tracker.py b/jarvis/executors/resource_tracker.py new file mode 100644 index 00000000..cde991e6 --- /dev/null +++ b/jarvis/executors/resource_tracker.py @@ -0,0 +1,29 @@ +from multiprocessing import Process +from typing import Any, Callable, Dict, Tuple + +from jarvis.modules.database import database +from jarvis.modules.models import models + +db = database.Database(database=models.fileio.base_db) + + +def semaphores( + fn: Callable, + args: Tuple = None, + kwargs: Dict[str, Any] = None, + daemon: bool = False, +) -> None: + """Resource tracker to store undefined process IDs in the base database and cleanup at shutdown. + + Args: + fn: Function to start multiprocessing for. + args: Optional arguments to pass. + kwargs: Keyword arguments to pass. + daemon: Boolean flag to set daemon mode. + """ + process = Process(target=fn, args=args or (), kwargs=kwargs or {}, daemon=daemon) + process.start() + with db.connection: + cursor = db.connection.cursor() + cursor.execute("INSERT INTO children (undefined) VALUES (?);", (process.pid,)) + db.connection.commit() diff --git a/jarvis/modules/meetings/events.py b/jarvis/modules/meetings/events.py index 5f79f404..f8bc8d5b 100644 --- a/jarvis/modules/meetings/events.py +++ b/jarvis/modules/meetings/events.py @@ -10,7 +10,6 @@ import sqlite3 import subprocess from datetime import datetime -from multiprocessing import Process # noinspection PyProtectedMember from multiprocessing.context import TimeoutError as ThreadTimeoutError @@ -18,6 +17,7 @@ import pynotification +from jarvis.executors import resource_tracker from jarvis.modules.audio import speaker from jarvis.modules.database import database from jarvis.modules.logger import logger @@ -177,7 +177,7 @@ def events(*args) -> None: datetime.now().strftime("%Y_%m_%d"), ) logger.info("Starting adhoc process to update %s table.", models.env.event_app) - Process(target=events_writer).start() + resource_tracker.semaphores(events_writer) speaker.speak( text=f"Events table is outdated {models.env.title}. Please try again in a minute or two." ) @@ -186,7 +186,7 @@ def events(*args) -> None: logger.info( "Starting adhoc process to get events from %s.", models.env.event_app ) - Process(target=events_writer).start() + resource_tracker.semaphores(events_writer) speaker.speak( text=f"Events table is empty {models.env.title}. Please try again in a minute or two." ) diff --git a/jarvis/modules/meetings/ics_meetings.py b/jarvis/modules/meetings/ics_meetings.py index ad0a1366..c3e93e94 100644 --- a/jarvis/modules/meetings/ics_meetings.py +++ b/jarvis/modules/meetings/ics_meetings.py @@ -8,7 +8,7 @@ import datetime import sqlite3 import time -from multiprocessing import Process, Queue +from multiprocessing import Queue # noinspection PyProtectedMember from multiprocessing.context import TimeoutError as ThreadTimeoutError @@ -17,7 +17,7 @@ import requests -from jarvis.executors import word_match +from jarvis.executors import resource_tracker, word_match from jarvis.modules.audio import speaker from jarvis.modules.database import database from jarvis.modules.exceptions import EgressErrors @@ -197,14 +197,14 @@ def meetings(phrase: str) -> None: % (meeting_status[1], datetime.datetime.now().strftime("%Y_%m_%d")) ) logger.info("Starting adhoc process to update ics table.") - Process(target=meetings_writer).start() + resource_tracker.semaphores(meetings_writer) speaker.speak( text=f"Meetings table is outdated {models.env.title}. Please try again in a minute or two." ) else: if shared.called_by_offline: logger.info("Starting adhoc process to get meetings from ICS.") - Process(target=meetings_writer).start() + resource_tracker.semaphores(meetings_writer) speaker.speak( text=f"Meetings table is empty {models.env.title}. Please try again in a minute or two." ) diff --git a/jarvis/modules/models/models.py b/jarvis/modules/models/models.py index 6d81d18a..70c9b2bb 100644 --- a/jarvis/modules/models/models.py +++ b/jarvis/modules/models/models.py @@ -56,6 +56,7 @@ "guard", "surveillance", "plot_mic", + "undefined", ), "vpn": ("state",), "party": ("pid",),