-
Notifications
You must be signed in to change notification settings - Fork 1
/
tianshuo_test.py
109 lines (69 loc) · 2.17 KB
/
tianshuo_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import socket
import time
import select
from utils import calc, str2hexstr
timeout_in_seconds = 5
rs485_socket = socket.socket()
rs485_socket.connect(("192.168.0.7", 26))
rs485_socket.setblocking(0)
address = chr(0x01)
function_code = chr(0x01)
start_at_reg = chr(0x00) + chr(0x13)
num_of_reg = chr(0x00) + chr(0x13)
read_device = address + function_code + start_at_reg + num_of_reg
print(type(read_device))
print(read_device)
crc = calc(read_device)
crc_hi = crc/256
crc_lo = crc & 0xFF
print "meter add: " + str(ord(address))
print "crc_lo: " + str(hex(crc_lo))
print "crc_hi: " + str(hex(crc_hi))
read_switch_str = '010600000001'
bytearray_switch = bytearray.fromhex(read_switch_str)
hexstr_switch = str(bytearray_switch)
print(hex(calc(hexstr_switch)))
while True:
time.sleep(10)
# s_open = '010f00100004010fbf51'
# s_close = '010f001000040100ff55'
# s_open = '0101001300138c02'
power_1_close = '010500100000CC0F'
power_1_open = '01050010FF008DFF'
power_2_close = '020500100000CC3C'
power_2_open = '02050010FF008DCC'
power_1_close_hex = str2hexstr(power_1_close)
power_1_open_hex = str2hexstr(power_1_open)
power_2_close_hex = str2hexstr(power_2_close)
power_2_open_hex = str2hexstr(power_2_open)
inp = 'input'
rs485_socket.sendall(power_1_close_hex)
ready = select.select([rs485_socket], [], [], timeout_in_seconds)
if ready[0]:
print('have open data!')
data = rs485_socket.recv(1024)
ret_str = str(data)
print(ret_str)
time.sleep(10)
# rs485_socket.sendall(power_1_open_hex)
# ready1 = select.select([rs485_socket], [], [], timeout_in_seconds)
# if ready1[0]:
# data1 = rs485_socket.recv(1024)
# print('have close data!')
# ret_str = str(data1)
# print(ret_str)
rs485_socket.sendall(power_2_close_hex)
ready = select.select([rs485_socket], [], [], timeout_in_seconds)
if ready[0]:
print('have open data!')
data = rs485_socket.recv(1024)
ret_str = str(data)
print(ret_str)
time.sleep(10)
# rs485_socket.sendall(power_2_open_hex)
# ready1 = select.select([rs485_socket], [], [], timeout_in_seconds)
# if ready1[0]:
# data1 = rs485_socket.recv(1024)
# print('have close data!')
# ret_str = str(data1)
# print(ret_str)