-
-
Notifications
You must be signed in to change notification settings - Fork 621
/
celery_tasks.py
executable file
·93 lines (74 loc) · 2.22 KB
/
celery_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import django
import sh
from celery import Celery
from datetime import timedelta
from os import getenv, path
from tenacity import Retrying, stop_after_attempt, wait_fixed
try:
django.setup()
# Place imports that uses Django in this block.
from lib import diagnostics
from lib.utils import (
connect_to_redis,
is_balena_app,
reboot_via_balena_supervisor,
shutdown_via_balena_supervisor,
)
except Exception:
pass
__author__ = "Screenly, Inc"
__copyright__ = "Copyright 2012-2024, Screenly, Inc"
__license__ = "Dual License: GPLv2 and Commercial License"
CELERY_RESULT_BACKEND = getenv(
'CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
CELERY_BROKER_URL = getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_TASK_RESULT_EXPIRES = timedelta(hours=6)
r = connect_to_redis()
celery = Celery(
'Anthias Celery Worker',
backend=CELERY_RESULT_BACKEND,
broker=CELERY_BROKER_URL,
result_expires=CELERY_TASK_RESULT_EXPIRES
)
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls cleanup() every hour.
sender.add_periodic_task(3600, cleanup.s(), name='cleanup')
sender.add_periodic_task(60*5, get_display_power.s(), name='display_power')
@celery.task(time_limit=30)
def get_display_power():
r.set('display_power', diagnostics.get_display_power())
r.expire('display_power', 3600)
@celery.task
def cleanup():
sh.find(
path.join(getenv('HOME'), 'screenly_assets'),
'-name', '*.tmp', '-delete')
@celery.task
def reboot_anthias():
"""
Background task to reboot Anthias
"""
if is_balena_app():
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
):
with attempt:
reboot_via_balena_supervisor()
else:
r.publish('hostcmd', 'reboot')
@celery.task
def shutdown_anthias():
"""
Background task to shutdown Anthias
"""
if is_balena_app():
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
):
with attempt:
shutdown_via_balena_supervisor()
else:
r.publish('hostcmd', 'shutdown')