-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy patholympus.py
432 lines (356 loc) · 15.5 KB
/
olympus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
import time
import json
from datetime import datetime, timedelta
import traceback
import logging, sys
import hmac
import sh
from pathlib import Path
USERS_PATH="data/offline_json.json"
LOG_PATH="data/logs/olympus.log"
#level = logging.DEBUG
level = logging.INFO
logging.basicConfig(level=level)
logger = logging.getLogger("olympus")
logging.getLogger("sh").setLevel(logging.WARNING) # silence sh's commands on INFO level
#logger.addHandler(sys.stdout)
# python3 olympus.py --log=debug # to try different level # not in code
import functools
def debug(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.debug(f"{func.__name__}()")
result = func(*args, **kwargs)
return result
return wrapper
import qrcode
logger.debug("finished imports for std lib")
#import custom packages
#TODO test the below imports with an esp
# import board
# import digitalio
# from digitalio import DigitalInOut
# from adafruit_pn532.i2c import PN532_SPI
import Check_Gsheet_UID
import Read_MFRC522
import Get_Buttons
import Pi_to_OLED
logger.debug("finished imports for custom")
import firebase_admin
from firebase_admin import credentials, db
#https://console.firebase.google.com/u/0/project/noisebridge-rfid-olympus/
#https://www.freecodecamp.org/news/how-to-get-started-with-firebase-using-python/
#https://firebase.google.com/docs/database/security/get-started?hl=en&authuser=2
# Circuit Python port for MFRC522 https://github.com/domdfcoding/circuitpython-mfrc522
#TODO: change firebase rules so that Members can only add guests and memebrs
#TODO: change firebase so guests to only view their experation date
logger.debug("finished imports for firebase")
logger.debug("finished imports")
# Add an authorized UID to the database
level_3 = "Gods"
level_2 = "Members"
level_1 = "Guests"
#ref.child('Mytikas').child(level_3).set({"uid":"08174ab9"})
# Retrieve authorized UIDs from the database
#authorized_uids = ref.child('Mytikas').get()
#print("Authorized UIDs:", authorized_uids)
local_cache = {}
@debug
def strike_the_door():
print("Striking the door")
logger.info("Striking the door")
Pi_to_OLED.OLED_off(7)
Pi_to_OLED.New_Message("Striking the door")
Get_Buttons.set_pin(20, True)
time.sleep(6)
Get_Buttons.set_pin(20, False)
@debug
def uid_is_valid(UID, cache):
#Check cache if the UID exists, else check the server
#print("Checking Validity")
logger.debug("Checking Validity")
def not_expired(card):
#print("Checking Expiration")
logger.debug(f"Checking Expiration")
present_date = datetime.now()
present_unix_timestamp = datetime.timestamp(present_date)*1000
end_unix_date = card["expire_date"]
is_card_currently_active = (present_unix_timestamp < end_unix_date or end_unix_date == 0)
logger.info(f"Expiration, {is_card_currently_active=}: {present_date=} < {card['exp']=}")
return is_card_currently_active
card = cache.get(UID)
if card:
print("Card is found in cache")
logger.info("Card is found in cache")
#Gets either a True or False
is_card_active = not_expired(card)
return is_card_active
#Card isnt even recognized
else:
return None
@debug
def rewrite_user_dict(users):
rewrite_json(json.dumps(users))
@debug
def rewrite_json(new_json):
if not new_json:
raise ValueError("tried to write nothing")
logger.debug(f"{new_json=}")
with open(USERS_PATH, "w") as f:
f.write(new_json)
@debug
def load_json():
with open(USERS_PATH, "r") as f:
user_dict = json.loads(f.read())
return user_dict
@debug
def add_uid(mentor_UID, new_UID, mentor_clearance_level, prodigy_clearance_level, user_dict):
#Adds a user to the server
print("Adding User")
logger.info("Adding User")
current_time = datetime.now()
current_unix_timestamp = datetime.timestamp(current_time)*1000
if (mentor_clearance_level == level_2) or (mentor_clearance_level == level_3):
if (prodigy_clearance_level == level_2) or (mentor_clearance_level == level_3):
# member
new_tag_data = {
'clearance': prodigy_clearance_level, # Replace with your actual tag ID
'expire_date': 0,
'issue_date': current_unix_timestamp,
'exp': "NA",
'iss': str(current_time),
'uid': new_UID,
'user_handle': "",
'mentor': mentor_UID
}
elif prodigy_clearance_level == level_1:
# 30 day
expiration_time = current_time + timedelta(days=30)
expiration_unix_timestamp = datetime.timestamp(expiration_time)*1000
new_tag_data = {
'clearance': prodigy_clearance_level, # Replace with your actual tag ID
'expire_date': expiration_unix_timestamp,
'issue_date': current_unix_timestamp,
'exp': str(expiration_time),
'iss': str(current_time),
'uid': new_UID,
'user_handle': "",
'mentor': mentor_UID
}
#New User getting added to either big M or guests
new_user = { new_tag_data['uid']: new_tag_data }
if new_tag_data['uid'] not in user_dict:
user_dict.update(new_user)
rewrite_user_dict(user_dict)
print("Added User", new_UID, "to", prodigy_clearance_level)
logger.info(f"Added User {new_UID} to {prodigy_clearance_level}")
send_log(("Added Acess from " + mentor_UID + " to " + new_UID + " at " + str(datetime.now())))
Pi_to_OLED.New_Message("New User: Please Scan QR Code. Red button to skip")
time.sleep(5)
Pi_to_OLED.New_UID_QR_Image(new_UID)
start_time = datetime.now()
waiting_period = 60
while True:
time.sleep(.2)
current_time = datetime.now()
elapsed_time = current_time - start_time
switch, button = Get_Buttons.read()
if elapsed_time.total_seconds() >= waiting_period:
break
#If you press the red button we skip the QR code
elif button == True:
break
Pi_to_OLED.OLED_off(1)
return user_dict
#Guest getting upgraded to big M
elif (new_tag_data['clearance'] == level_2) and (new_tag_data['uid'] in user_dict):
user_dict.update(new_user)
rewrite_user_dict(user_dict)
print("Added User", new_UID, "to", prodigy_clearance_level)
logger.info(f"Added User {new_UID} to {prodigy_clearance_level}")
Pi_to_OLED.New_Message("30 Day Member ---> Big M")
Pi_to_OLED.OLED_off(5)
return user_dict
#Guest getting a 30 day refreshed
elif (new_tag_data['clearance'] == level_1) and (new_tag_data['uid'] in user_dict):
user_dict.update(new_user)
rewrite_user_dict(user_dict)
print("Added User", new_UID, "to", prodigy_clearance_level)
logger.info(f"Added User {new_UID} to {prodigy_clearance_level}")
Pi_to_OLED.OLED_off(3)
Pi_to_OLED.New_Message(f"30 Days refreshed, expires on {new_tag_data['exp']}")
time.sleep(1)
Pi_to_OLED.OLED_off(1)
return user_dict
#This UID already has this level of access
elif (new_tag_data['uid'] in user_dict) and (new_tag_data['clearance'] == user_dict['uid']['clearance']):
print("User a", new_UID, "to", prodigy_clearance_level)
logger.info(f"User {new_UID} already present as {user_dict[new_tag_data['uid']]}")
Pi_to_OLED.OLED_off(3)
Pi_to_OLED.New_Message("This user already has this level of access")
time.sleep(1)
return False
#Uncaught case, probably an error
else:
logger.info(f"User {new_UID} already present as {user_dict[new_tag_data['uid']]}")
Pi_to_OLED.OLED_off(3)
Pi_to_OLED.New_Message("There was an error, please contact the access control gods")
time.sleep(5)
return False
else:
print("Only big M Members can do this action")
logger.info("Only big M Members can do this action")
Pi_to_OLED.OLED_off(3)
Pi_to_OLED.New_Message("Only big M Members can add access")
time.sleep(1)
@debug
def send_log(log):
#Inform sever of unauthorized scanning, succesful scanning, and give a time stamp, inform of users added and by whom
#ref.child('Ourea').push().set(log)
with open(LOG_PATH, "a+") as log_file:
print(log, file=log_file)
@debug
def read_user_action(switch, button):
#reads state of buttons to determine whether we are adding a guest or Big M Member
#TODO test the reading of these buttons
#TODO investigate the use of https://docs.python.org/3/library/signal.html
if switch and button:
return level_2
else:
return level_1
@debug
def look_up_clearance_level(card_uid, cache):
#formatted_UID = f'"{card_uid}"'
card = cache.get(card_uid)
if card:
clearance = card.get('clearance')
return clearance
else:
logger.info(f"error card id {card_uid} returns {card}")
logger.info(f"{cache=}")
@debug
def generate_QR(new_UID):
url = f"https://docs.google.com/forms/d/e/1FAIpQLSdXIPnJPoPdBreH9FOQjW-s5nUuZ4QHThNK59u3kmUDplx3Bg/viewform?usp=pp_url&entry.181306502={new_UID}"
return
@debug
def main():
user_dict = load_json()
logger.debug("Done caching")
Pi_to_OLED.OLED_off(3)
Pi_to_OLED.New_Message("REBOOTED and READY (3s)")
count = 0
activity_pin = True
while True:
time.sleep(.1)
if count % 10 == 0:
Get_Buttons.set_pin(16, activity_pin)
activity_pin = not activity_pin
count += 1
switch, button = Get_Buttons.read()
card_uid = Read_MFRC522.Read_UID()
if card_uid:
logger.debug(f"{card_uid=}")
clearance = look_up_clearance_level(card_uid, user_dict)
is_valid = uid_is_valid(card_uid, user_dict)
logger.info(f"{card_uid=} {is_valid=} {switch=} {button=}") # log every scan (incl valid & switch/button state)
#Easter Egg: Hack the Planet!
if not card_uid and button:
time.sleep(.5)
switch, button = Get_Buttons.read()
if button:
Pi_to_OLED.New_Message("HACK THE PLANET!")
Pi_to_OLED.OLED_off(4)
#Open the door to an authorized user
elif card_uid and is_valid and not switch:
print(switch, button)
logger.info(f"{switch=} {button=}")
strike_the_door()
Pi_to_OLED.New_Message(f'Your access expires on {user_dict[card_uid]["exp"]}')
time.sleep(1)
Pi_to_OLED.OLED_off(3)
time.sleep(1)
send_log(("Opened door to "+card_uid+" at "+str(datetime.now())))
#Begin SUDO mode... Adding new users or modifiying existing users
elif card_uid and is_valid and (switch == True) and ((clearance == level_2) or (clearance == level_3)):
Pi_to_OLED.New_Message("SUDO engaged")
Pi_to_OLED.OLED_off(100)
time.sleep(1)
Pi_to_OLED.New_Message("If adding a big M, hold down the red button")
time.sleep(5)
switch, button = Get_Buttons.read()
prodigy_level = read_user_action(switch,button)
if prodigy_level == level_2:
Pi_to_OLED.New_Message("BIG M selected")
time.sleep(2)
mentor_clearance = look_up_clearance_level(card_uid, user_dict)
Pi_to_OLED.New_Message("Place new member card on the reader now")
new_UID = Read_MFRC522.Read_UID(30, card_uid)
if (new_UID != "") and new_UID != card_uid:
tmp = add_uid(card_uid, new_UID, mentor_clearance, prodigy_level, user_dict)
if tmp:
user_dict = tmp
# send_log(("Added Acess from " + card_uid + " to " + new_UID + " at " + str(datetime.now())))
# Pi_to_OLED.New_Message("New User: Please Scan QR and enter name (25s)")
# Pi_to_OLED.New_UID_QR_Image(new_UID)
# time.sleep(25)
# Pi_to_OLED.OLED_off(1)
else:
#send_log(f"Added Acess from {card_uid} to {new_UID} at {str(datetime.now()}")
#Pi_to_OLED.New_Message(f"Existing user with {user_dict[card_uid]['clearance']} role")
Pi_to_OLED.New_Message(f"Error adding user with {user_dict[card_uid]['clearance']} role")
time.sleep(8)
Pi_to_OLED.OLED_off(1)
else:
print("Card reading timed out")
logger.info("Card reading timed out")
Pi_to_OLED.OLED_off(5)
Pi_to_OLED.New_Message("Card Reading timed out, also Mifare NFC tags only")
time.sleep(3)
# A 30 day access member has been identified and lacks permission to add a new user
elif card_uid and is_valid and (switch == True) and (clearance == level_1):
print("Need Big M to do this")
logger.info("Need Big M to do this")
Pi_to_OLED.New_Message("Need a Big M to do this, please turn switch off")
Pi_to_OLED.OLED_off(5)
time.sleep(2)
# Card ID is read and recognized but has expired
elif card_uid and (is_valid == False):
logger.info("Access expired")
Pi_to_OLED.New_Message(f'Your 30 access expired on {user_dict[card_uid]["exp"]}')
time.sleep(1)
Pi_to_OLED.New_Message('Please renew access by talking to a big M Member')
Pi_to_OLED.OLED_off(5)
time.sleep(5)
# Card ID is read but no record of it exists
elif card_uid and (is_valid == None):
print("Access Denied")
logger.info("Access Denied")
Pi_to_OLED.New_Message(f"Access Denied: {card_uid}")
Pi_to_OLED.OLED_off(3)
time.sleep(2)
send_log(("Denied Access to " + card_uid + " at "+str(datetime.now())))
#Need to clear the variables to start our next loop fresh
is_valid = None
card_uid = None
prodigy_level = None
mentor_clearance = None
clearance = None
new_UID = None
if __name__ == "__main__":
try:
file_version = sh.git("hash-object","./olympus.py").strip()
logger.info(f"starting log: {level=}, version: {file_version} (git hash-object ./olympus.py)")
main()
except Exception:
print(traceback.format_exc())
error_message = traceback.format_exc()
error_first = error_message[40:]
error_last = error_message[-40:-1]
Pi_to_OLED.New_Message(error_first)
time.sleep(4)
Pi_to_OLED.OLED_off(5)
Pi_to_OLED.New_Message(error_last)
time.sleep(4)
Pi_to_OLED.OLED_off(5)
logger.error(traceback.format_exc())
main()