Skip to content

Commit

Permalink
Include a resource tracker to track and terminate all background proc…
Browse files Browse the repository at this point in the history
…esses
  • Loading branch information
dormant-user committed Oct 4, 2024
1 parent 5a72ed9 commit 4c319ac
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 28 deletions.
9 changes: 4 additions & 5 deletions jarvis/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import pathlib
import shutil
from datetime import datetime
from multiprocessing import Process
from threading import Thread

from fastapi import FastAPI
Expand All @@ -12,7 +11,7 @@
from jarvis.api import routers
from jarvis.api.logger import logger
from jarvis.api.squire import discover, stockanalysis_squire
from jarvis.executors import crontab
from jarvis.executors import crontab, resource_tracker
from jarvis.modules.models import models

# Initiate API
Expand Down Expand Up @@ -89,9 +88,9 @@ def startup() -> None:
log_file = datetime.now().strftime(
os.path.join("logs", "startup_script_%d-%m-%Y.log")
)
Process(
target=crontab.executor, args=(script, log_file, "startup_script")
).start()
resource_tracker.semaphores(
crontab.executor, (script, log_file, "startup_script")
)
else:
logger.warning(
"Unsupported file format for startup script: %s", startup_script
Expand Down
4 changes: 3 additions & 1 deletion jarvis/api/routers/surveillance.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ async def video_feed(request: Request, token: str = None):
detail="Requires authentication since endpoint uses single-use token.",
)
settings.surveillance.token = None
settings.surveillance.queue_manager[settings.surveillance.client_id] = Queue()
settings.surveillance.queue_manager[settings.surveillance.client_id] = Queue(
maxsize=10
)
process = Process(
target=surveillance_squire.gen_frames,
kwargs={
Expand Down
8 changes: 4 additions & 4 deletions jarvis/executors/commander.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import random
import time
import traceback
from multiprocessing import Process
from threading import Thread
from typing import Tuple

Expand All @@ -11,6 +10,7 @@
listener_controls,
offline,
others,
resource_tracker,
word_match,
)
from jarvis.modules.audio import speaker
Expand Down Expand Up @@ -85,9 +85,9 @@ def timed_delay(phrase: str) -> Tuple[str, int | float]:
split_ = phrase.split("after")
if task := split_[0].strip():
delay = util.delay_calculator(phrase=split_[1].strip())
Process(
target=delay_condition, kwargs={"phrase": task, "delay": delay}
).start()
resource_tracker.semaphores(
delay_condition, kwargs={"phrase": task, "delay": delay}
)
return task, delay


Expand Down
5 changes: 3 additions & 2 deletions jarvis/executors/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
listener_controls,
others,
remind,
resource_tracker,
weather_monitor,
word_match,
)
Expand Down Expand Up @@ -120,7 +121,7 @@ def background_task_runner() -> None:
if "weather" in exec_task.lower():
# run as daemon and not store in children table as this won't take long
logger.debug("Initiating weather alert monitor")
Process(target=weather_monitor.monitor, daemon=True).start()
resource_tracker.semaphores(weather_monitor.monitor)
else:
logger.debug("Executing: '%s'", exec_task)
try:
Expand Down Expand Up @@ -222,7 +223,7 @@ def background_task_runner() -> None:
"day", datetime.now().strftime("%A")
) == datetime.now().strftime("%A"):
logger.info("Executing alarm: %s", alarmer)
Process(target=alarm.executor).start()
resource_tracker.semaphores(alarm.executor)
if not alarmer["repeat"]:
copied_alarms.remove(alarmer)
if copied_alarms != alarms:
Expand Down
10 changes: 1 addition & 9 deletions jarvis/executors/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from multiprocessing import Process
from typing import Dict, List

import psutil

from jarvis.executors import process_map
from jarvis.modules.database import database
from jarvis.modules.logger import logger
Expand Down Expand Up @@ -121,14 +119,8 @@ def stop_child_processes() -> None:
logger.info(children)
for category, pids in children.items():
for pid in pids:
try:
proc = psutil.Process(pid=pid)
except psutil.NoSuchProcess:
# Occurs commonly since child processes run only for a short time and `INSERT or REPLACE` leaves dupes
logger.debug("Process [%s] PID not found %d", category, pid)
continue
logger.info("Stopping process [%s] with PID: %d", category, pid)
support.stop_process(pid=proc.pid)
support.stop_process(pid=pid)


def stop_processes(func_name: str = None) -> None:
Expand Down
29 changes: 29 additions & 0 deletions jarvis/executors/resource_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from multiprocessing import Process
from typing import Any, Callable, Dict, Tuple

from jarvis.modules.database import database
from jarvis.modules.models import models

db = database.Database(database=models.fileio.base_db)


def semaphores(
fn: Callable,
args: Tuple = None,
kwargs: Dict[str, Any] = None,
daemon: bool = False,
) -> None:
"""Resource tracker to store undefined process IDs in the base database and cleanup at shutdown.
Args:
fn: Function to start multiprocessing for.
args: Optional arguments to pass.
kwargs: Keyword arguments to pass.
daemon: Boolean flag to set daemon mode.
"""
process = Process(target=fn, args=args or (), kwargs=kwargs or {}, daemon=daemon)
process.start()
with db.connection:
cursor = db.connection.cursor()
cursor.execute("INSERT INTO children (undefined) VALUES (?);", (process.pid,))
db.connection.commit()
6 changes: 3 additions & 3 deletions jarvis/modules/meetings/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import sqlite3
import subprocess
from datetime import datetime
from multiprocessing import Process

# noinspection PyProtectedMember
from multiprocessing.context import TimeoutError as ThreadTimeoutError
from multiprocessing.pool import ThreadPool

import pynotification

from jarvis.executors import resource_tracker
from jarvis.modules.audio import speaker
from jarvis.modules.database import database
from jarvis.modules.logger import logger
Expand Down Expand Up @@ -177,7 +177,7 @@ def events(*args) -> None:
datetime.now().strftime("%Y_%m_%d"),
)
logger.info("Starting adhoc process to update %s table.", models.env.event_app)
Process(target=events_writer).start()
resource_tracker.semaphores(events_writer)
speaker.speak(
text=f"Events table is outdated {models.env.title}. Please try again in a minute or two."
)
Expand All @@ -186,7 +186,7 @@ def events(*args) -> None:
logger.info(
"Starting adhoc process to get events from %s.", models.env.event_app
)
Process(target=events_writer).start()
resource_tracker.semaphores(events_writer)
speaker.speak(
text=f"Events table is empty {models.env.title}. Please try again in a minute or two."
)
Expand Down
8 changes: 4 additions & 4 deletions jarvis/modules/meetings/ics_meetings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import datetime
import sqlite3
import time
from multiprocessing import Process, Queue
from multiprocessing import Queue

# noinspection PyProtectedMember
from multiprocessing.context import TimeoutError as ThreadTimeoutError
Expand All @@ -17,7 +17,7 @@

import requests

from jarvis.executors import word_match
from jarvis.executors import resource_tracker, word_match
from jarvis.modules.audio import speaker
from jarvis.modules.database import database
from jarvis.modules.exceptions import EgressErrors
Expand Down Expand Up @@ -197,14 +197,14 @@ def meetings(phrase: str) -> None:
% (meeting_status[1], datetime.datetime.now().strftime("%Y_%m_%d"))
)
logger.info("Starting adhoc process to update ics table.")
Process(target=meetings_writer).start()
resource_tracker.semaphores(meetings_writer)
speaker.speak(
text=f"Meetings table is outdated {models.env.title}. Please try again in a minute or two."
)
else:
if shared.called_by_offline:
logger.info("Starting adhoc process to get meetings from ICS.")
Process(target=meetings_writer).start()
resource_tracker.semaphores(meetings_writer)
speaker.speak(
text=f"Meetings table is empty {models.env.title}. Please try again in a minute or two."
)
Expand Down
1 change: 1 addition & 0 deletions jarvis/modules/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"guard",
"surveillance",
"plot_mic",
"undefined",
),
"vpn": ("state",),
"party": ("pid",),
Expand Down

0 comments on commit 4c319ac

Please sign in to comment.