-
-
Notifications
You must be signed in to change notification settings - Fork 35
/
pacemanager.py
72 lines (58 loc) · 2.65 KB
/
pacemanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
import collections
import random
import typing
import common
PaceSettings_ = collections.namedtuple('PaceSettings_', ['weight', 'min_duration', 'max_duration'])
class PaceManager:
"""Manages transitions between game paces, and notifies users of changes via a callback.
The game starts out in the initial pace, then switches pace according to parameters
passed in. The actual pace is treated as an opaque object -- this class does not care
what the pace represents, it is just in charge of timing transitions.
Sample usage:
pm = PaceManager(cb, pace1, 10)
pm.add_or_update_pace(pace2, 1.0, 10, 20)
pm.add_or_update_pace(pace3, 2.0, 5, 10)
pm.start()
....
pm.stop()
Here, we start off with pace1 for 10 seconds. After that, we will switch to either pace2, or pace3,
with pace3 being twice as likely. If pace2 is chosen, it will be kept for 10-20 seconds. pace3 will
be kept for 5-10 seconds.
"""
def __init__(self, callback, initial_pace, initial_pace_time: float, rng=random.uniform):
self.initial_pace_ = initial_pace
self.initial_pace_time_ = initial_pace_time
self.available_paces_ = {}
self.task_ = None
self.rng_ = rng
self.callback_ = callback
def add_or_update_pace(self, pace, weight: float, min_duration: float, max_duration: float):
self.available_paces_[pace] = PaceSettings_(weight, min_duration, max_duration)
def start(self):
self.task_ = asyncio.ensure_future(self.run_())
return self.task_
def stop(self):
self.task_.cancel()
def set_pace_(self, pace):
self.callback_(pace)
def choose_new_pace_(self, old_pace) -> typing.Tuple[object, float]:
if len(self.available_paces_) == 0:
raise RuntimeError("No paces registered.")
candidates = self.available_paces_
total_weight = sum([ params.weight for params in candidates.values() ])
index = self.rng_(0, total_weight)
cumulative_weight = 0
for pace, params in candidates.items():
cumulative_weight += params.weight
if cumulative_weight >= index:
return pace, self.rng_(params.min_duration, params.max_duration)
raise ValueError("Couldn't find pace with index %s/%s!?" % (index, total_weight))
@common.async_print_exceptions
async def run_(self):
await asyncio.sleep(self.initial_pace_time_)
pace = self.initial_pace_
while True:
pace, duration_secs = self.choose_new_pace_(pace)
self.set_pace_(pace)
await asyncio.sleep(duration_secs)