forked from kcbam/dbus-huaweisun2000-pvinverter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connector_modbus.py
126 lines (105 loc) · 5.11 KB
/
connector_modbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from sun2000_modbus import inverter
from sun2000_modbus import registers
from dbus.mainloop.glib import DBusGMainLoop
from settings import HuaweiSUN2000Settings
state1Readable = {
1: "standby",
2: "grid connected",
4: "grid connected normaly",
8: "grid connection with derating due to power rationing",
16: "grid connection with derating due to internal causes of the solar inverter",
32: "normal stop",
64: "stop due to faults",
128: "stop due to power",
256: "shutdown",
512: "spot check"
}
state2Readable = {
1: "locking status (0:locked;1:unlocked)",
2: "pv connection stauts (0:disconnected;1:conncted)",
4: "grid connected normaly",
8: "grid connection with derating due to power rationing",
16: "grid connection with derating due to internal causes of the solar inverter",
32: "normal stop",
64: "stop due to faults",
128: "stop due to power",
256: "shutdown",
512: "spot check"
}
state3Readable = {
1: "off-grid(0:on-grid;1:off-grid",
2: "off-grid-switch(0:disable;1:enable)"
}
alert1Readable = {
1: "",
2: ""
}
class ModbusDataCollector2000Delux:
def __init__(self, host='192.168.200.1', port=6607, modbus_unit=0, power_correction_factor=0.995):
self.invSun2000 = inverter.Sun2000(host=host, port=port, modbus_unit=modbus_unit)
self.power_correction_factor = power_correction_factor
def getData(self):
# the connect() method internally checks whether there's already a connection
if not self.invSun2000.connect():
print("Connection error Modbus TCP")
return None
data = {}
dbuspath = {
'/Ac/Power': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.ActivePower},
'/Ac/L1/Current': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseACurrent},
'/Ac/L1/Voltage': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.LineVoltageBetweenPhasesAAndB},
'/Dc/Power': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.InputPower},
'/Ac/MaxPower': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.MaximumActivePower},
}
for k, v in dbuspath.items():
s = v.get("sun2000")
data[k] = self.invSun2000.read(s)
state1 = self.invSun2000.read(registers.InverterEquipmentRegister.State1)
state1_string = ";".join([val for key, val in state1Readable.items() if int(state1)&key>0])
data['/Status'] = state1_string
# data['/Ac/StatusCode'] = statuscode
energy_forward = self.invSun2000.read(registers.InverterEquipmentRegister.AccumulatedEnergyYield)
data['/Ac/Energy/Forward'] = energy_forward
# There is no Modbus register for the phases
data['/Ac/L1/Energy/Forward'] = round(energy_forward / 3.0, 2)
data['/Ac/L2/Energy/Forward'] = round(energy_forward / 3.0, 2)
data['/Ac/L3/Energy/Forward'] = round(energy_forward / 3.0, 2)
freq = self.invSun2000.read(registers.InverterEquipmentRegister.GridFrequency)
data['/Ac/L1/Frequency'] = freq
cosphi = float(self.invSun2000.read((registers.InverterEquipmentRegister.PowerFactor)))
data['/Ac/L1/Power'] = cosphi * float(data['/Ac/L1/Voltage']) * float(
data['/Ac/L1/Current']) * self.power_correction_factor
return data
def getStaticData(self):
# the connect() method internally checks whether there's already a connection
if not self.invSun2000.connect():
print("Connection error Modbus TCP")
return None
try:
data = {}
data['SN'] = self.invSun2000.read(registers.InverterEquipmentRegister.SN)
data['ModelID'] = self.invSun2000.read(registers.InverterEquipmentRegister.ModelID)
data['Model'] = str(self.invSun2000.read_formatted(registers.InverterEquipmentRegister.Model)).replace('\0',
'')
data['NumberOfPVStrings'] = self.invSun2000.read(registers.InverterEquipmentRegister.NumberOfPVStrings)
data['NumberOfMPPTrackers'] = self.invSun2000.read(registers.InverterEquipmentRegister.NumberOfMPPTrackers)
return data
except:
print("Problem while getting static data modbus TCP")
return None
## Just for testing ##
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
settings = HuaweiSUN2000Settings()
inverter = inverter.Sun2000(host=settings.get("modbus_host"), port=settings.get("modbus_port"),
modbus_unit=settings.get("modbus_unit"))
inverter.connect()
if inverter.isConnected():
attrs = (getattr(registers.InverterEquipmentRegister, name) for name in
dir(registers.InverterEquipmentRegister))
datata = dict()
for f in attrs:
if isinstance(f, registers.InverterEquipmentRegister):
datata[f.name] = inverter.read_formatted(f)
for k, v in datata.items():
print(f"{k}: {v}")