Skip to content

Commit

Permalink
Merge pull request #303 from NTIA/debug_task_id_skip
Browse files Browse the repository at this point in the history
Fix TaskResult IDs skipped in Task Results
  • Loading branch information
jhazentia authored Dec 12, 2024
2 parents 830212f + fd30f6b commit 19aff8d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/handlers/health_check_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from pathlib import Path
from time import sleep

from django.conf import settings

Expand All @@ -10,3 +11,4 @@ def trigger_api_restart_callback(sender, **kwargs):
logger.warning("triggering API container restart")
if settings.IN_DOCKER:
Path(settings.SDR_HEALTHCHECK_FILE).touch()
sleep(60) # sleep to prevent running next task until restart completed
28 changes: 26 additions & 2 deletions src/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def run(self, blocking=True):

try:
self.calibrate_if_needed()
self.reset_next_task_id()
while True:
with minimum_duration(blocking):
self._consume_schedule(blocking)
Expand Down Expand Up @@ -155,7 +156,16 @@ def _initialize_task_result(self) -> TaskResult:
"""Initalize an 'in-progress' result so it exists when action runs."""
tid = self.task.task_id
task_result = TaskResult(schedule_entry=self.entry, task_id=tid)
logger.debug(f"Creating task result with task id = {tid}")
task_result.save()
if tid > 1:
last_task_id_exists = TaskResult.objects.filter(task_id=tid-1).exists()
if not last_task_id_exists:
logger.warning(f"TaskResult for previous task id ({tid-1}) does not exist. Current task_id = {tid}")
# commented out code useful for debugging if this warning occurs
# self.entry.is_active = False
# self.entry.save()
# raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}")
return task_result

def _call_task_action(self):
Expand Down Expand Up @@ -247,10 +257,13 @@ def _finalize_task_result(self, task_result, started, finished, status, detail):
self.consecutive_failures = 1
else:
self.consecutive_failures = 0
self.last_status = status
if self.consecutive_failures >= settings.MAX_FAILURES:
trigger_api_restart.send(sender=self.__class__)

self.last_status = status
# prevent more tasks from being run
# restart can cause missing db task result ids
# if tasks continue to run waiting for restart
thread.stop()

@staticmethod
def _callback_response_handler(resp, task_result):
Expand Down Expand Up @@ -373,6 +386,17 @@ def calibrate_if_needed(self):
"Skipping startup calibration since sensor_calibration exists and has not expired."
)

def reset_next_task_id(self):
# reset next task id
for entry in self.schedule:
count = TaskResult.objects.filter(schedule_entry=entry).count()
if count > 0:
last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id
if entry.next_task_id != last_task_id + 1:
logger.info(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}")
entry.next_task_id = last_task_id + 1
entry.save(update_fields=("next_task_id",))


@contextmanager
def minimum_duration(blocking):
Expand Down

0 comments on commit 19aff8d

Please sign in to comment.