-
Notifications
You must be signed in to change notification settings - Fork 0
/
py_openshowvar.py
152 lines (126 loc) · 4.09 KB
/
py_openshowvar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
'''
A Python port of KUKA VarProxy client (OpenShowVar).
From https://github.com/linuxsand/py_openshowvar
'''
from __future__ import print_function
import sys
import struct
import random
import socket
__version__ = '1.1.5'
ENCODING = 'UTF-8'
PY2 = sys.version_info[0] == 2
if PY2: input = raw_input
class openshowvar(object):
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.msg_id = random.randint(1, 100)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.ip, self.port))
except socket.error:
pass
def test_connection(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
ret = sock.connect_ex((self.ip, self.port))
return ret == 0
except socket.error:
print('socket error')
return False
can_connect = property(test_connection)
def read(self, var, debug=True):
if not isinstance(var, str):
raise Exception('Var name is a string')
else:
self.varname = var if PY2 else var.encode(ENCODING)
return self._read_var(debug)
def write(self, var, value, debug=True):
if not (isinstance(var, str) and isinstance(value, str)):
raise Exception('Var name and its value should be string')
self.varname = var if PY2 else var.encode(ENCODING)
self.value = value if PY2 else value.encode(ENCODING)
return self._write_var(debug)
def _read_var(self, debug):
req = self._pack_read_req()
self._send_req(req)
_value = self._read_rsp(debug)
if debug:
print(_value)
return _value
def _write_var(self, debug):
req = self._pack_write_req()
self._send_req(req)
_value = self._read_rsp(debug)
if debug:
print(_value)
return _value
def _send_req(self, req):
self.rsp = None
self.sock.sendall(req)
self.rsp = self.sock.recv(256)
def _pack_read_req(self):
var_name_len = len(self.varname)
flag = 0
req_len = var_name_len + 3
return struct.pack(
'!HHBH'+str(var_name_len)+'s',
self.msg_id,
req_len,
flag,
var_name_len,
self.varname
)
def _pack_write_req(self):
var_name_len = len(self.varname)
flag = 1
value_len = len(self.value)
req_len = var_name_len + 3 + 2 + value_len
return struct.pack(
'!HHBH'+str(var_name_len)+'s'+'H'+str(value_len)+'s',
self.msg_id,
req_len,
flag,
var_name_len,
self.varname,
value_len,
self.value
)
def _read_rsp(self, debug=False):
if self.rsp is None: return None
var_value_len = len(self.rsp) - struct.calcsize('!HHBH') - 3
result = struct.unpack('!HHBH'+str(var_value_len)+'s'+'3s', self.rsp)
_msg_id, body_len, flag, var_value_len, var_value, isok = result
if debug:
print('[DEBUG]', result)
if result[-1].endswith(b'\x01') and _msg_id == self.msg_id:
self.msg_id += 1
return var_value
def close(self):
self.sock.close()
############### test ###############
def run_shell(ip, port):
client = openshowvar(ip, port)
if not client.can_connect:
print('Connection error')
import sys
sys.exit(-1)
print('\nConnected KRC Name: ', end=' ')
client.read('$ROBNAME[]', False)
while True:
data = input('\nInput var_name [, var_value]\n(`q` for quit): ')
if data.lower() == 'q':
print('Bye')
client.close()
break
else:
parts = data.split(',')
if len(parts) == 1:
client.read(data.strip(), True)
else:
client.write(parts[0], parts[1].lstrip(), True)
if __name__ == '__main__':
ip = input('IP Address: ')
port = input('Port: ')
run_shell(ip, int(port))