This repository has been archived by the owner on Sep 3, 2022. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
cooldowns.py
115 lines (87 loc) · 3.18 KB
/
cooldowns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import time
class Cooldown:
__slots__ = ('rate', 'per', '_window', '_tokens', '_last')
def __init__(self, rate, per):
self.rate = int(rate)
self.per = float(per)
self._window = 0.0
self._tokens = self.rate
self._last = 0.0
def get_tokens(self, current=None):
if not current:
current = time.time()
tokens = self._tokens
if current > self._window + self.per:
tokens = self.rate
return tokens
def get_retry_after(self, current=None):
current = current or time.time()
tokens = self.get_tokens(current)
if tokens == 0:
return self.per - (current - self._window)
return 0.0
def update_rate_limit(self, current=None):
current = current or time.time()
self._last = current
self._tokens = self.get_tokens(current)
# first token used means that we start a new rate limit window
if self._tokens == self.rate:
self._window = current
# check if we are rate limited
if self._tokens == 0:
return self.per - (current - self._window)
# we're not so decrement our tokens
self._tokens -= 1
# see if we got rate limited due to this token change, and if
# so update the window to point to our current time frame
if self._tokens == 0:
self._window = current
def reset(self):
self._tokens = self.rate
self._last = 0.0
def copy(self):
return Cooldown(self.rate, self.per)
def __repr__(self):
return (
f"<Cooldown rate: {self.rate} per: {self.per} window: "
f"{self._window} tokens: {self._tokens}>"
)
class CooldownMapping:
def __init__(self, original):
self._cache = {}
self._cooldown = original
def copy(self):
ret = CooldownMapping(self._cooldown)
ret._cache = self._cache.copy()
return ret
@property
def valid(self):
return self._cooldown is not None
@classmethod
def from_cooldown(cls, rate, per):
return cls(Cooldown(rate, per))
def _bucket_key(self, cooldown_key):
return cooldown_key
def _verify_cache_integrity(self, current=None):
# we want to delete all cache objects that haven't been used
# in a cooldown window. e.g. if we have a command that has a
# cooldown of 60s and it has not been used in 60s then that
# key should be deleted
current = current or time.time()
dead_keys = [
k for k, v in self._cache.items() if current > v._last + v.per
]
for k in dead_keys:
del self._cache[k]
def get_bucket(self, cooldown_key, current=None):
self._verify_cache_integrity(current)
key = self._bucket_key(cooldown_key)
if key not in self._cache:
bucket = self._cooldown.copy()
self._cache[key] = bucket
else:
bucket = self._cache[key]
return bucket
def update_rate_limit(self, cooldown_key, current=None):
bucket = self.get_bucket(cooldown_key, current)
return bucket.update_rate_limit(current)