-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclm5ip_exporter.py
executable file
·121 lines (104 loc) · 4.26 KB
/
clm5ip_exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/python3
import socket
import sys
import time
from prometheus_client import start_http_server
from prometheus_client.core import GaugeMetricFamily, REGISTRY
class Clm5ipCollector(object):
def __init__(self, target):
self._target = target
info = self.executeCommand("i").split(";")
if (info[0] != "CLM5IP"):
sys.stderr.write("Device did not report as CLM5IP, but instead as '" + info[0] + "'\n")
exit(3)
self._outputNames = []
for i in range(5):
self._outputNames.append(self.getName("o" + str(i + 1)))
self._temperatureNames = []
for i in range(2):
self._temperatureNames.append(self.getName("t" + str(i + 1)))
self._analogInNames = []
for i in range(2):
self._analogInNames.append(self.getName("ain" + str(i + 1)))
self._digitalInNames = []
for i in range(4):
self._digitalInNames.append(self.getName("din" + str(i + 1)))
def collect(self):
activePower = GaugeMetricFamily('clm5ip_active_power_watts', 'Active power P', labels=["output"])
apparentPower = GaugeMetricFamily('clm5ip_apparent_power_va', 'Apparent power S', labels=["output"])
reactivePower = GaugeMetricFamily('clm5ip_reactive_power_var', 'Reactive power Q', labels=["output"])
voltage = GaugeMetricFamily('clm5ip_voltage_volts', 'Voltage V', labels=["output"])
current = GaugeMetricFamily('clm5ip_current_amperes', 'Current I', labels=["output"])
powerState = GaugeMetricFamily('clm5ip_power_state', 'Power status (1 = On)', labels=["output"])
temperature = GaugeMetricFamily('clm5ip_temperature_celsius', 'Temperature', labels=["input"])
analogIn = GaugeMetricFamily('clm5ip_analog_in_ratio', 'Analog input', labels=["input"])
digitalIn = GaugeMetricFamily('clm5ip_digital_in_status', 'Digital input (1 = On)', labels=["input"])
d = self.executeCommand("get data")
if d == "":
sys.stderr.write("Received no data, device in use by other client?\n")
return
data = d.replace(",", ".").split(";")
if len(data) != 40:
sys.stderr.write("Received invalid data, expected 40 fields but got " + str(len(data)) + "\n")
return
for i in range(5):
activePower.add_metric([self._outputNames[i]], float(data[(i * 6) + 0]))
apparentPower.add_metric([self._outputNames[i]], float(data[(i * 6) + 1]))
reactivePower.add_metric([self._outputNames[i]], float(data[(i * 6) + 2]))
voltage.add_metric([self._outputNames[i]], float(data[(i * 6) + 3]))
current.add_metric([self._outputNames[i]], float(data[(i * 6) + 4]))
powerState.add_metric([self._outputNames[i]], int(data[(i * 6) + 5]))
for i in range(2):
temperature.add_metric([self._temperatureNames[i]], float(data[30 + i]))
for i in range(2):
analogIn.add_metric([self._analogInNames[i]], float(data[32 + i]) / 100.0)
for i in range(4):
digitalIn.add_metric([self._digitalInNames[i]], int(data[34 + i]))
yield activePower
yield apparentPower
yield reactivePower
yield voltage
yield current
yield powerState
yield temperature
yield analogIn
yield digitalIn
def getName(self, module):
ret = self.executeCommand("gn " + module).split(";")
return ret[0]
def executeCommand(self, command):
try:
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.connect((self._target, 10001))
except socket.error as err:
sys.stderr.write("Could not connect to CLM5IP {0}: {1}\n".format(self._target, err))
return ""
sck.send((command + "\r\n").encode())
buffer = bytearray()
while True:
chunk = sck.recv(16)
buffer.extend(chunk)
if b'\n' in chunk or not chunk:
break
sck.shutdown(socket.SHUT_RDWR)
sck.close()
time.sleep(0.1)
firstline = buffer[:buffer.find(b'\r\n')].decode()
if firstline == "" or "unknown" in firstline:
sys.stderr.write("Command '" + command + "' could not be executed\n")
return ""
return firstline
def main():
try:
if len(sys.argv) < 2:
sys.stderr.write("Usage: clm5ip_exporter.py 192.168.0.162\n")
exit(1)
REGISTRY.register(Clm5ipCollector(sys.argv[1]))
start_http_server(9820)
while True:
time.sleep(1)
except KeyboardInterrupt:
print(" Interrupted")
exit(0)
if __name__ == "__main__":
main()