Skip to content

Commit

Permalink
Fix scheduled task failure and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
russss committed Nov 5, 2023
1 parent e9651a2 commit 6f3ec43
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 33 deletions.
68 changes: 35 additions & 33 deletions models/scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def my_function():
"""
import pendulum
import logging
from sqlalchemy.orm import Session
from functools import wraps

from main import db
Expand Down Expand Up @@ -95,37 +96,38 @@ def wrapper(*args, **kwargs):

def execute_scheduled_tasks(force=False):
# Create a new session, so tasks calling commit don't free our lock
session = db.create_scoped_session()

# Take an exclusive lock on the ScheduledTaskResult table to prevent tasks colliding
session.execute(f"LOCK TABLE {ScheduledTaskResult.__tablename__} IN EXCLUSIVE MODE")

tasks_to_run = []
if force:
tasks_to_run += tasks
else:
for task in tasks:
res = ScheduledTaskResult.get_latest_run(task.name, session)
if res is None or res.start_time + task.duration < pendulum.now():
tasks_to_run.append(task)

log.info("Running %s periodic tasks...", len(tasks_to_run))
for task in tasks_to_run:
log.info("Running %s", task.name)
result = ScheduledTaskResult(task.name)
try:
result.result["returnval"] = task.func()

except Exception as e:
log.exception(f"Exception in {task.name}: {repr(e)}")
result.result["exception"] = repr(e)

# Clean up the main session whatever happens
db.session.rollback()

result.finish()
session.add(result)

ScheduledTaskResult.cleanup(session)
session.commit()
with Session(db.engine, autocommit=False) as lock_session:
# Take an exclusive lock on the ScheduledTaskResult table to prevent tasks colliding
lock_session.execute(
f"LOCK TABLE {ScheduledTaskResult.__tablename__} IN EXCLUSIVE MODE"
)

tasks_to_run = []
if force:
tasks_to_run += tasks
else:
for task in tasks:
res = ScheduledTaskResult.get_latest_run(task.name, lock_session)
if res is None or res.start_time + task.duration < pendulum.now():
tasks_to_run.append(task)

log.info("Running %s periodic tasks...", len(tasks_to_run))
for task in tasks_to_run:
log.info("Running %s", task.name)
result = ScheduledTaskResult(task.name)
try:
result.result["returnval"] = task.func()

except Exception as e:
log.exception(f"Exception in {task.name}: {repr(e)}")
result.result["exception"] = repr(e)

# Clean up the main session whatever happens
db.session.rollback()

result.finish()
lock_session.add(result)

ScheduledTaskResult.cleanup(lock_session)
lock_session.commit()
log.info("Tasks complete.")
5 changes: 5 additions & 0 deletions tests/test_scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from models.scheduled_task import execute_scheduled_tasks


def test_scheduled_tasks(app):
execute_scheduled_tasks(force=True)

0 comments on commit 6f3ec43

Please sign in to comment.