-
Notifications
You must be signed in to change notification settings - Fork 7
/
connector_modbus.py
138 lines (115 loc) · 5.88 KB
/
connector_modbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from sun2000_modbus import inverter
from sun2000_modbus import registers
from dbus.mainloop.glib import DBusGMainLoop
from settings import HuaweiSUN2000Settings
state1Readable = {
1: "standby",
2: "grid connected",
4: "grid connected normaly",
8: "grid connection with derating due to power rationing",
16: "grid connection with derating due to internal causes of the solar inverter",
32: "normal stop",
64: "stop due to faults",
128: "stop due to power",
256: "shutdown",
512: "spot check"
}
state2Readable = {
1: "locking status (0:locked;1:unlocked)",
2: "pv connection stauts (0:disconnected;1:conncted)",
4: "grid connected normaly",
8: "grid connection with derating due to power rationing",
16: "grid connection with derating due to internal causes of the solar inverter",
32: "normal stop",
64: "stop due to faults",
128: "stop due to power",
256: "shutdown",
512: "spot check"
}
state3Readable = {
1: "off-grid(0:on-grid;1:off-grid",
2: "off-grid-switch(0:disable;1:enable)"
}
alert1Readable = {
1: "",
2: ""
}
class ModbusDataCollector2000Delux:
def __init__(self, host='192.168.200.1', port=6607, modbus_unit=0, power_correction_factor=0.995):
self.invSun2000 = inverter.Sun2000(host=host, port=port, modbus_unit=modbus_unit)
self.power_correction_factor = power_correction_factor
def getData(self):
# the connect() method internally checks whether there's already a connection
if not self.invSun2000.connect():
print("Connection error Modbus TCP")
return None
data = {}
dbuspath = {
'/Ac/Power': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.ActivePower},
'/Ac/L1/Current': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseACurrent},
'/Ac/L1/Voltage': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseAVoltage},
'/Ac/L2/Current': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseBCurrent},
'/Ac/L2/Voltage': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseBVoltage},
'/Ac/L3/Current': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseCCurrent},
'/Ac/L3/Voltage': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.PhaseCVoltage},
'/Dc/Power': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.InputPower},
'/Ac/MaxPower': {'initial': 0, "sun2000": registers.InverterEquipmentRegister.MaximumActivePower},
}
for k, v in dbuspath.items():
s = v.get("sun2000")
data[k] = self.invSun2000.read(s)
state1 = self.invSun2000.read(registers.InverterEquipmentRegister.State1)
state1_string = ";".join([val for key, val in state1Readable.items() if int(state1)&key>0])
data['/Status'] = state1_string
# data['/Ac/StatusCode'] = statuscode
energy_forward = self.invSun2000.read(registers.InverterEquipmentRegister.AccumulatedEnergyYield)
data['/Ac/Energy/Forward'] = energy_forward
# There is no Modbus register for the phases
data['/Ac/L1/Energy/Forward'] = round(energy_forward / 3.0, 2)
data['/Ac/L2/Energy/Forward'] = round(energy_forward / 3.0, 2)
data['/Ac/L3/Energy/Forward'] = round(energy_forward / 3.0, 2)
freq = self.invSun2000.read(registers.InverterEquipmentRegister.GridFrequency)
data['/Ac/L1/Frequency'] = freq
data['/Ac/L2/Frequency'] = freq
data['/Ac/L3/Frequency'] = freq
cosphi = float(self.invSun2000.read((registers.InverterEquipmentRegister.PowerFactor)))
data['/Ac/L1/Power'] = cosphi * float(data['/Ac/L1/Voltage']) * float(
data['/Ac/L1/Current']) * self.power_correction_factor
data['/Ac/L2/Power'] = cosphi * float(data['/Ac/L2/Voltage']) * float(
data['/Ac/L2/Current']) * self.power_correction_factor
data['/Ac/L3/Power'] = cosphi * float(data['/Ac/L3/Voltage']) * float(
data['/Ac/L3/Current']) * self.power_correction_factor
return data
def getStaticData(self):
# the connect() method internally checks whether there's already a connection
if not self.invSun2000.connect():
print("Connection error Modbus TCP")
return None
try:
data = {}
data['SN'] = self.invSun2000.read(registers.InverterEquipmentRegister.SN)
data['ModelID'] = self.invSun2000.read(registers.InverterEquipmentRegister.ModelID)
data['Model'] = str(self.invSun2000.read_formatted(registers.InverterEquipmentRegister.Model)).replace('\0',
'')
data['NumberOfPVStrings'] = self.invSun2000.read(registers.InverterEquipmentRegister.NumberOfPVStrings)
data['NumberOfMPPTrackers'] = self.invSun2000.read(registers.InverterEquipmentRegister.NumberOfMPPTrackers)
return data
except:
print("Problem while getting static data modbus TCP")
return None
## Just for testing ##
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
settings = HuaweiSUN2000Settings()
inverter = inverter.Sun2000(host=settings.get("modbus_host"), port=settings.get("modbus_port"),
modbus_unit=settings.get("modbus_unit"))
inverter.connect()
if inverter.isConnected():
attrs = (getattr(registers.InverterEquipmentRegister, name) for name in
dir(registers.InverterEquipmentRegister))
datata = dict()
for f in attrs:
if isinstance(f, registers.InverterEquipmentRegister):
datata[f.name] = inverter.read_formatted(f)
for k, v in datata.items():
print(f"{k}: {v}")