-
Notifications
You must be signed in to change notification settings - Fork 0
/
heartbeat.py
67 lines (56 loc) · 1.78 KB
/
heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import threading
import time
class Heartbeat:
def __init__(self, period):
self.is_timing = False
self.listeners = {}
self.stop_callback = None
self.set_timer_period(period)
def start_timer(self, stop_callback = None):
if self.is_timing == False:
t = threading.Thread(target=self.timing, args=(self.dispatch_listeners,))
self.is_timing = True
self.stop_callback = stop_callback
t.start()
else:
print("heartbeat::start_timer There's already a timer running on this instance")
def stop_timer(self):
self.is_timing = False
if callable(self.stop_callback):
self.stop_callback()
self.stop_callback = None
print("heartbeat::stop_timer timer halt requested")
def timing(self, timing_callback):
while self.is_timing:
time.sleep(self.timer_period)
# re-check in case this changes while sleeping the thread
if self.is_timing:
timing_callback()
print("heartbeat::timing timer execution ended")
def dispatch_listeners(self):
if self.is_timing:
for key in self.listeners:
if callable(self.listeners[key]):
try:
listener = self.listeners[key]
listener()
except Exception as e:
print("heartbeat::dispatch_listeners error while executing listener " + key)
raise e
def unregister_listener(self, listener_id):
if listener_id in self.listeners:
self.listeners.pop(listener_id)
def register_listener(self, listener_id, callback):
if listener_id not in self.listeners:
self.listeners[listener_id] = callback
else:
raise ValueError
def set_timer_period(self, period):
if not self.is_timing:
# safety check
if period > 1e-4:
self.timer_period = period # timer_period per second
else:
self.timer_period = 1
else:
print("heartbeat::set_timer_period period can't be changed while timing")