From 7f95b3faad1411404fddcc58785a55fff040baee Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Fri, 1 Nov 2024 09:49:35 -0600 Subject: [PATCH 1/9] changes for debugging task id mismatch --- src/requirements-dev.txt | 4 ++-- src/requirements.txt | 4 ++-- src/scheduler/scheduler.py | 10 ++++++++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/requirements-dev.txt b/src/requirements-dev.txt index 95dfe235..6711278d 100644 --- a/src/requirements-dev.txt +++ b/src/requirements-dev.txt @@ -309,11 +309,11 @@ scipy==1.10.1 # via # -r requirements.txt # scos-actions -scos-actions @ git+https://github.com/NTIA/scos-actions@10.0.2 +scos-actions @ git+https://github.com/NTIA/scos-actions@nan_y_factor_cal # via # -r requirements.txt # scos-tekrsa -scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@7.0.1 +scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@nan_y_factor_cal # via -r requirements.txt sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive # via diff --git a/src/requirements.txt b/src/requirements.txt index 36ac37e7..021787c1 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -149,9 +149,9 @@ ruamel-yaml-clib==0.2.8 # via ruamel-yaml scipy==1.10.1 # via scos-actions -scos-actions @ git+https://github.com/NTIA/scos-actions@10.0.2 +scos-actions @ git+https://github.com/NTIA/scos-actions@nan_y_factor_cal # via scos-tekrsa -scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@7.0.1 +scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@nan_y_factor_cal # via -r requirements.in sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive # via scos-actions diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 9f392079..9fb39182 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -154,7 +154,15 @@ def _initialize_task_result(self) -> TaskResult: """Initalize an 'in-progress' result so it exists when action runs.""" tid = self.task.task_id task_result = TaskResult(schedule_entry=self.entry, task_id=tid) + logger.debug(f"Creating task result with task id = {tid}") task_result.save() + if tid > 1: + last_task_id_exists = TaskResult.objects.exists(task_id=tid-1) + if not last_task_id_exists: + logger.debug(f"TaskResult for tid = {tid-1} does not exist. Stopping schedule.") + self.entry.is_active = False + self.entry.save() + raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}") return task_result def _call_task_action(self): @@ -268,7 +276,9 @@ def _queue_pending_tasks(self, schedule_snapshot): continue task_id = entry.get_next_task_id() + logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") entry.save(update_fields=("next_task_id",)) + logger.debug(f"entry.next_task_id = {entry.next_task_id}") pri = entry.priority action = entry.action pending_queue.enter(task_time, pri, action, entry.name, task_id) From cfa6ab065e12272af390f55f32c8fff274c76748 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Fri, 1 Nov 2024 10:19:11 -0600 Subject: [PATCH 2/9] fix query --- src/scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 9fb39182..4ef9aad6 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -157,7 +157,7 @@ def _initialize_task_result(self) -> TaskResult: logger.debug(f"Creating task result with task id = {tid}") task_result.save() if tid > 1: - last_task_id_exists = TaskResult.objects.exists(task_id=tid-1) + last_task_id_exists = TaskResult.objects.filter(task_id=tid-1).exists() if not last_task_id_exists: logger.debug(f"TaskResult for tid = {tid-1} does not exist. Stopping schedule.") self.entry.is_active = False From 0d954787de007bd35f0947326ac6f38be0c65289 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 4 Nov 2024 08:25:03 -0700 Subject: [PATCH 3/9] fix for task id skip --- src/handlers/health_check_handler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/handlers/health_check_handler.py b/src/handlers/health_check_handler.py index c4423038..bba25f41 100644 --- a/src/handlers/health_check_handler.py +++ b/src/handlers/health_check_handler.py @@ -1,5 +1,6 @@ import logging from pathlib import Path +from time import sleep from django.conf import settings @@ -10,3 +11,4 @@ def trigger_api_restart_callback(sender, **kwargs): logger.warning("triggering API container restart") if settings.IN_DOCKER: Path(settings.SDR_HEALTHCHECK_FILE).touch() + sleep(60) # sleep to prevent running next task until restart completed From 59e7c5ac008e68e555a114a50b6ac4a19be2c093 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 5 Nov 2024 08:12:29 -0700 Subject: [PATCH 4/9] stop scheduler thread after trigger api restart --- src/scheduler/scheduler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 4ef9aad6..99fb1e4a 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -245,6 +245,7 @@ def _finalize_task_result(self, task_result, started, finished, status, detail): task_result.save() with self.task_status_lock: + self.last_status = status if status == "failure" and self.last_status == "failure": self.consecutive_failures = self.consecutive_failures + 1 elif status == "failure": @@ -253,8 +254,10 @@ def _finalize_task_result(self, task_result, started, finished, status, detail): self.consecutive_failures = 0 if self.consecutive_failures >= settings.MAX_FAILURES: trigger_api_restart.send(sender=self.__class__) - - self.last_status = status + # prevent more tasks from being run + # restart can cause missing db task result ids + # if tasks continue to run waiting for restart + thread.stop() @staticmethod def _callback_response_handler(resp, task_result): From 1fa8f1e0ad7a5612bebf8df52b25b9117826720e Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 12 Nov 2024 09:45:54 -0700 Subject: [PATCH 5/9] check for task result in db before incrementing task id --- src/scheduler/scheduler.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 99fb1e4a..a14b0070 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -278,10 +278,15 @@ def _queue_pending_tasks(self, schedule_snapshot): if task_time is None: continue - task_id = entry.get_next_task_id() - logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") - entry.save(update_fields=("next_task_id",)) - logger.debug(f"entry.next_task_id = {entry.next_task_id}") + current_task_id = entry.next_task_id + if TaskResult.objects.filter(schedule_entry=entry, task_id=current_task_id).count() == 0: + task_id = current_task_id + logger.debug(f"Not task result exists for {current_task_id} using {task_id} for next task id.") + else: + task_id = entry.get_next_task_id() + logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") + entry.save(update_fields=("next_task_id",)) + logger.debug(f"entry.next_task_id = {entry.next_task_id}") pri = entry.priority action = entry.action pending_queue.enter(task_time, pri, action, entry.name, task_id) From adf5f65f03794ad37da274e3e59d82ea004b27d3 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 12 Nov 2024 12:37:38 -0700 Subject: [PATCH 6/9] try resetting next task id --- src/scheduler/scheduler.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index a14b0070..86c9749c 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -93,6 +93,7 @@ def run(self, blocking=True): try: self.calibrate_if_needed() + self.reset_next_task_id() while True: with minimum_duration(blocking): self._consume_schedule(blocking) @@ -278,15 +279,19 @@ def _queue_pending_tasks(self, schedule_snapshot): if task_time is None: continue - current_task_id = entry.next_task_id - if TaskResult.objects.filter(schedule_entry=entry, task_id=current_task_id).count() == 0: - task_id = current_task_id - logger.debug(f"Not task result exists for {current_task_id} using {task_id} for next task id.") - else: - task_id = entry.get_next_task_id() - logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") - entry.save(update_fields=("next_task_id",)) - logger.debug(f"entry.next_task_id = {entry.next_task_id}") + # count = TaskResult.objects.filter(schedule_entry=entry).count() + # if count > 0: + # last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id + # entry.next_task_id = last_task_id + 1 + # current_task_id = entry.next_task_id + # if TaskResult.objects.filter(schedule_entry=entry, task_id=current_task_id).count() == 0: + # task_id = current_task_id + # logger.debug(f"Not task result exists for {current_task_id} using {task_id} for next task id.") + # else: + task_id = entry.get_next_task_id() + logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") + entry.save(update_fields=("next_task_id",)) + logger.debug(f"entry.next_task_id = {entry.next_task_id}") pri = entry.priority action = entry.action pending_queue.enter(task_time, pri, action, entry.name, task_id) @@ -387,6 +392,15 @@ def calibrate_if_needed(self): "Skipping startup calibration since sensor_calibration exists and has not expired." ) + def reset_next_task_id(self): + # reset next task id + for entry in self.schedule: + count = TaskResult.objects.filter(schedule_entry=entry).count() + if count > 0: + last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id + entry.next_task_id = last_task_id + 1 + entry.save(update_fields=("next_task_id",)) + @contextmanager def minimum_duration(blocking): From c0aff6bfda29c8f88fc9c8a90c0773c0479c7cfa Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 12 Nov 2024 13:52:18 -0700 Subject: [PATCH 7/9] only change next task id if needed, add log message --- src/scheduler/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 86c9749c..0391b10b 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -398,8 +398,10 @@ def reset_next_task_id(self): count = TaskResult.objects.filter(schedule_entry=entry).count() if count > 0: last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id - entry.next_task_id = last_task_id + 1 - entry.save(update_fields=("next_task_id",)) + if entry.next_task_id != last_task_id + 1: + logger.debug(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}") + entry.next_task_id = last_task_id + 1 + entry.save(update_fields=("next_task_id",)) @contextmanager From 608414134ced061c9192e0ecbff2661832b43c7f Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Thu, 21 Nov 2024 10:57:06 -0700 Subject: [PATCH 8/9] remove/disable debug code, update log messages and levels --- src/requirements-dev.txt | 4 ++-- src/requirements.txt | 4 ++-- src/scheduler/scheduler.py | 22 ++++++---------------- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/src/requirements-dev.txt b/src/requirements-dev.txt index a2820c41..12add771 100644 --- a/src/requirements-dev.txt +++ b/src/requirements-dev.txt @@ -325,11 +325,11 @@ scipy==1.13.1 # via # -r requirements.txt # scos-actions -scos-actions @ git+https://github.com/NTIA/scos-actions@nan_y_factor_cal +scos-actions @ git+https://github.com/NTIA/scos-actions@11.0.0 # via # -r requirements.txt # scos-tekrsa -scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@nan_y_factor_cal +scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@8.0.0 # via -r requirements.txt sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive # via diff --git a/src/requirements.txt b/src/requirements.txt index 188a53fd..80b9740f 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -157,9 +157,9 @@ ruamel-yaml-clib==0.2.8 # via ruamel-yaml scipy==1.13.1 # via scos-actions -scos-actions @ git+https://github.com/NTIA/scos-actions@nan_y_factor_cal +scos-actions @ git+https://github.com/NTIA/scos-actions@11.0.0 # via scos-tekrsa -scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@nan_y_factor_cal +scos-tekrsa @ git+https://github.com/NTIA/scos-tekrsa@8.0.0 # via -r requirements.in sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive # via scos-actions diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 93ce1dc1..415f965a 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -161,10 +161,11 @@ def _initialize_task_result(self) -> TaskResult: if tid > 1: last_task_id_exists = TaskResult.objects.filter(task_id=tid-1).exists() if not last_task_id_exists: - logger.debug(f"TaskResult for tid = {tid-1} does not exist. Stopping schedule.") - self.entry.is_active = False - self.entry.save() - raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}") + logger.warning(f"TaskResult for previous task id ({tid-1}) does not exist. Current task_id = {tid}") + # commented out code useful for debugging if this warning occurs + # self.entry.is_active = False + # self.entry.save() + # raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}") return task_result def _call_task_action(self): @@ -283,19 +284,8 @@ def _queue_pending_tasks(self, schedule_snapshot): if task_time is None: continue - # count = TaskResult.objects.filter(schedule_entry=entry).count() - # if count > 0: - # last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id - # entry.next_task_id = last_task_id + 1 - # current_task_id = entry.next_task_id - # if TaskResult.objects.filter(schedule_entry=entry, task_id=current_task_id).count() == 0: - # task_id = current_task_id - # logger.debug(f"Not task result exists for {current_task_id} using {task_id} for next task id.") - # else: task_id = entry.get_next_task_id() - logger.debug(f"Updating schedule entry next task id, task_id = {task_id}") entry.save(update_fields=("next_task_id",)) - logger.debug(f"entry.next_task_id = {entry.next_task_id}") pri = entry.priority action = entry.action pending_queue.enter(task_time, pri, action, entry.name, task_id) @@ -403,7 +393,7 @@ def reset_next_task_id(self): if count > 0: last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id if entry.next_task_id != last_task_id + 1: - logger.debug(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}") + logger.info(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}") entry.next_task_id = last_task_id + 1 entry.save(update_fields=("next_task_id",)) From fd30f6b4109263aef4139984551d322158266f8a Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 10 Dec 2024 15:19:41 -0700 Subject: [PATCH 9/9] fix last status bug --- src/scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 415f965a..001b98e0 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -251,13 +251,13 @@ def _finalize_task_result(self, task_result, started, finished, status, detail): task_result.save() with self.task_status_lock: - self.last_status = status if status == "failure" and self.last_status == "failure": self.consecutive_failures = self.consecutive_failures + 1 elif status == "failure": self.consecutive_failures = 1 else: self.consecutive_failures = 0 + self.last_status = status if self.consecutive_failures >= settings.MAX_FAILURES: trigger_api_restart.send(sender=self.__class__) # prevent more tasks from being run