-
Notifications
You must be signed in to change notification settings - Fork 4
/
configuration.py
62 lines (52 loc) · 1.66 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
import ddb
class Configuration(object):
table_name = "Configuration"
fake_table_name = "FakeConfiguration"
def __init__(self, fake=False):
self.fake = fake
if fake:
self.table_name = self.fake_table_name
self.ddb = ddb.DDB(self.table_name, [('item_key', 'S')])
self.table = self.ddb.get_table()
self.cache = {}
def get(self, key):
if key in self.cache:
return self.cache[key]
response = self.table.get_item(Key={'item_key': key})
item = response.get("Item")
self.cache[key] = item
return item
def set_count(self, count_label, value):
self.table.update_item(
Key={
'item_key': 'counts'
},
UpdateExpression="set {}=:v".format(count_label),
ExpressionAttributeValues={
":v": int(value)
},
ReturnValues="UPDATED_NEW"
)
def get_count(self, count_label):
counts = self.get("counts")
if not counts:
return 0
return counts.get(count_label, 0)
def get_last_run(self):
last_run = self.get("last run")
if not last_run:
return 0
last_run = int(last_run['last run'])
return last_run
def set(self, key, valuedict):
"""
sets valuedict {k:v} as the value in DDB for row identified by key
"""
item = {'item_key': key}
for k in valuedict:
item[k] = valuedict[k]
self.table.put_item(Item=item)
def set_last_run(self):
now = int(time.time())
self.set('last_run', {'last_run': now})