This repository has been archived by the owner on Sep 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpi.py
47 lines (38 loc) · 1.51 KB
/
rpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from datadog_checks.checks import AgentCheck
import os
class RpiCheck(AgentCheck):
def measure_temp(self):
temp = os.popen("vcgencmd measure_temp").readline()
try:
return float(temp.replace("temp=","").replace("'C",""))
except (ValueError, TypeError) as err:
raise Exception(f"Invalid temperature reading: {temp} ({err})")
def measure_clock(self):
out = {}
for k in ["arm", "core", "h264", "isp", "v3d", "uart", "pwm", "emmc", "pixel", "vec", "hdmi", "dpi"]:
v = os.popen(f"vcgencmd measure_clock {k}").readline()
try:
v = float(v[v.find("=")+1:])
except ValueError as err:
continue
out.update({k: v})
return out
def check(self, instance):
err_list = []
try:
self.gauge(f"rpi.temperature.soc", float(self.measure_temp()), tags=["cpu", "temperature"])
except Exception as err:
err_list.append(f"measure_temp:{err}")
try:
clocks = self.measure_clock()
for k, v in clocks.items():
self.gauge(f"rpi.clock.{k}", v, tags=["cpu", "frequency"])
except Exception as err:
err_list.append(f"measure_clock:{err}")
if err_list:
status = AgentCheck.CRITICAL
msg = ','.join(err_list)
else:
status = AgentCheck.OK
msg = 'Ok'
self.service_check('rpi.check', status, message=msg)