-
Notifications
You must be signed in to change notification settings - Fork 2
/
rpi-radiation-monitor.py
executable file
·129 lines (116 loc) · 4.13 KB
/
rpi-radiation-monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/python3
import RPi.GPIO as GPIO
import json
import sys
import time
import logging
import base64
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
cf = open(sys.argv[1])
config = json.load(cf)
version = "1.0.0"
tube_constant=151
monitor_pin = config["monitor-pin"]
hostname = config["host"]
port = config["port"]
interval = 60
measurements = []
radiation = 0
cpm = 0
counter= 0
uptime = 0
started= 0
timestamp = time.time() * 1000
GPIO.setmode(GPIO.BOARD)
GPIO.setup(monitor_pin, GPIO.IN)
GPIO.setup(monitor_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
logging.basicConfig(filename='rpi-radiation-monitor.log', level=logging.INFO)
#logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
logging.info('RPi Radiation Monitor %s', version)
class ServerHandler(BaseHTTPRequestHandler):
def do_GET(self):
authorization = self.headers['authorization']
if not is_authorized(authorization, config['credentials']):
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm="Access to RPi camera"')
self.send_header('Proxy-Authenticate', 'Basic realm="Access to RPi camera"')
self.end_headers()
elif self.path == '/api/v1/system/info':
system_info = {
"id": config['id'],
"type": "rpi-radiation-monitor",
"version": version,
"name": config['name'],
"timestamp": int(time.time()),
"uptime": int(uptime)
}
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response_body = json.dumps(system_info);
self.wfile.write(bytes(response_body, "utf-8"))
elif self.path == '/api/v1/system/measurements':
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response = {
"radiation": {
"value": radiation,
"unit": "uSv/h"
},
"counter": {
"cpm": cpm,
"total": counter
},
"timestamp": timestamp,
"uptime": uptime
}
response_body = json.dumps(response);
self.wfile.write(bytes(response_body, "utf-8"))
else:
self.send_error(404)
self.end_headers()
def is_authorized(authorization, credentials):
if authorization is None:
logging.error("authorization header is missing !")
return False
if not authorization.startswith( 'Basic ' ):
logging.error("authorization Basic is expected !")
return False
decoded = str(base64.b64decode(authorization.split()[1]),'utf-8')
auth_split = decoded.split(":")
if auth_split[0] in credentials:
password = credentials[auth_split[0]]
if password == auth_split[1]:
logging.info("user and password match !")
return True
logging.error("authorization failed !")
return False
def start_web_server(hostname, port):
web_server = HTTPServer((hostname, port), ServerHandler)
logging.info('Web server started %s:%s!', hostname, port);
web_server.serve_forever()
return web_server
def add_and_calculate(timestamp, measurements, interval):
evictiontime = timestamp - (interval * 1000)
measurements_copy = []
for ts in measurements:
if ts > evictiontime:
measurements_copy.append(ts)
measurements_copy.append(timestamp)
cpm = len(measurements_copy)
radiation = cpm/tube_constant
return measurements_copy, cpm, radiation
webThread = threading.Thread( target=start_web_server, args=(hostname, port) )
webThread.start()
started = time.time()
while True:
timestamp = time.time() * 1000
uptime = time.time() - started
GPIO.wait_for_edge(monitor_pin, GPIO.FALLING)
counter = counter + 1
measurements, cpm, radiation = add_and_calculate(timestamp, measurements, interval)
logging.info('Counter: %s, CPM: %s, radiation: %s uSv/h', counter, cpm, radiation)
webThread.stop()
GPIO.cleanup()