diff --git a/models/scheduled_task.py b/models/scheduled_task.py index 07c98a84c..46f64cf39 100644 --- a/models/scheduled_task.py +++ b/models/scheduled_task.py @@ -17,6 +17,7 @@ def my_function(): """ import pendulum import logging +from sqlalchemy.orm import Session from functools import wraps from main import db @@ -95,37 +96,38 @@ def wrapper(*args, **kwargs): def execute_scheduled_tasks(force=False): # Create a new session, so tasks calling commit don't free our lock - session = db.create_scoped_session() - - # Take an exclusive lock on the ScheduledTaskResult table to prevent tasks colliding - session.execute(f"LOCK TABLE {ScheduledTaskResult.__tablename__} IN EXCLUSIVE MODE") - - tasks_to_run = [] - if force: - tasks_to_run += tasks - else: - for task in tasks: - res = ScheduledTaskResult.get_latest_run(task.name, session) - if res is None or res.start_time + task.duration < pendulum.now(): - tasks_to_run.append(task) - - log.info("Running %s periodic tasks...", len(tasks_to_run)) - for task in tasks_to_run: - log.info("Running %s", task.name) - result = ScheduledTaskResult(task.name) - try: - result.result["returnval"] = task.func() - - except Exception as e: - log.exception(f"Exception in {task.name}: {repr(e)}") - result.result["exception"] = repr(e) - - # Clean up the main session whatever happens - db.session.rollback() - - result.finish() - session.add(result) - - ScheduledTaskResult.cleanup(session) - session.commit() + with Session(db.engine, autocommit=False) as lock_session: + # Take an exclusive lock on the ScheduledTaskResult table to prevent tasks colliding + lock_session.execute( + f"LOCK TABLE {ScheduledTaskResult.__tablename__} IN EXCLUSIVE MODE" + ) + + tasks_to_run = [] + if force: + tasks_to_run += tasks + else: + for task in tasks: + res = ScheduledTaskResult.get_latest_run(task.name, lock_session) + if res is None or res.start_time + task.duration < pendulum.now(): + tasks_to_run.append(task) + + log.info("Running %s periodic tasks...", len(tasks_to_run)) + for task in tasks_to_run: + log.info("Running %s", task.name) + result = ScheduledTaskResult(task.name) + try: + result.result["returnval"] = task.func() + + except Exception as e: + log.exception(f"Exception in {task.name}: {repr(e)}") + result.result["exception"] = repr(e) + + # Clean up the main session whatever happens + db.session.rollback() + + result.finish() + lock_session.add(result) + + ScheduledTaskResult.cleanup(lock_session) + lock_session.commit() log.info("Tasks complete.") diff --git a/tests/test_scheduled_tasks.py b/tests/test_scheduled_tasks.py new file mode 100644 index 000000000..2527c1219 --- /dev/null +++ b/tests/test_scheduled_tasks.py @@ -0,0 +1,5 @@ +from models.scheduled_task import execute_scheduled_tasks + + +def test_scheduled_tasks(app): + execute_scheduled_tasks(force=True)