-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
executable file
·117 lines (94 loc) · 3.48 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from init import init_leds
import queue
import threading
import inspect
import types
import random
import settings
from submodules import *
class Scheduler:
class QObject:
def __init__(self, priority, id, fnc):
self.base_prio = priority
self.curr_prio = priority
self.call_fnc = fnc
self.id = id
def get_fnc(self):
self.curr_prio = self.base_prio
return self.call_fnc
def __lt__(self, other):
if self.curr_prio < other.curr_prio:
return True
else:
return False
def adjust_prio(self):
self.curr_prio = max(1.1, self.curr_prio - 0.2)
def __init__(self):
self.loop_q = queue.PriorityQueue()
self.event_q = queue.PriorityQueue()
self.submodules = self.load_submodules()
settings.LOADED_MODULES = len(self.submodules)
print("Done loading Modules! " + str(len(self.submodules)) + " modules were loaded.\n")
self.services = self.init_submodule_services()
settings.RUNNING_SERVICES = len(self.services)
print("Done loading Services! " + str(len(self.services)) + " services are runnning.\n")
self.matrix = init_leds()
self.schedule()
def schedule(self):
while True:
self.next()
def next(self):
if not self.event_q.empty():
event = self.event_q.get()
print("Event Next: " + str(event.call_fnc))
self.run_module(event.get_fnc())
else:
for obj in self.loop_q.queue:
obj.adjust_prio()
loop = self.loop_q.get()
fnc = loop.get_fnc()
self.loop_q.put(loop)
print("Loop Next: " + str(loop.call_fnc))
self.run_module(fnc)
def run_module(self, fnc):
thread = threading.Thread(target=fnc, args=(self.matrix, ))
thread.start()
thread.join()
def load_submodules(self):
classes = []
for name, val in globals().items():
if isinstance(val, types.ModuleType) and "submodules" in val.__name__:
print("Found submodule %s" % val)
clsmembers = inspect.getmembers(val, inspect.isclass)
classes.append(clsmembers[0][1](self.add_loop, self.rmv_loop, self.add_event))
return classes
def init_submodule_services(self):
services = []
for mod in self.submodules:
try:
thread = threading.Thread(target=mod.service, args=())
thread.start()
services.append(thread)
print("Loaded Service of module: " + str(mod))
except AttributeError:
pass
#print("Module: " + str(mod) + " has no 'service' method.", file=sys.stderr)
return services
def add_event(self, priority, display_fnc):
id = random.getrandbits(128)
qObj = Scheduler.QObject(priority, id, display_fnc)
self.event_q.put(qObj)
return id
def add_loop(self, priority, display_fnc):
id = random.getrandbits(128)
qObj = Scheduler.QObject(priority, id, display_fnc)
self.loop_q.put(qObj)
return id
def rmv_loop(self, id):
for obj in self.loop_q.queue:
if obj.id == id:
self.loop_q.queue.remove(obj)
return
raise Exception("Cant find ID for loop function!")
if __name__ == "__main__":
Scheduler()