-
Notifications
You must be signed in to change notification settings - Fork 0
/
ev3deploy.py
242 lines (202 loc) · 8.46 KB
/
ev3deploy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import sys
import threading
from pathlib import Path
import fnmatch
import os
import argparse
from paramiko import SSHClient
from scp import SCPClient
from typing import List, Optional, TextIO
SIGKILL = 9
SIGTERM = 15
PATH = './'
EXECUTABLE = ['*.py', '*.sh']
PASSWORD = "maker"
HOSTNAME = "ev3dev"
USERNAME = "robot"
IGNORE_PATH = "./.ignore"
EXECUTE_FILE = None
def read_exclude(ignore_path: str) -> List[str]:
"""
Read the exclude file ('.ignore').
:param ignore_path: Path to the exclude file.
:return: A list of file patterns to ignore.
"""
if not os.path.exists(Path(ignore_path)):
ignore = open(Path(ignore_path), 'w+')
ignore.writelines(['./.ignore\n', './ev3deploy.py\n', '*/.*'])
ignore.close()
ignore = open(ignore_path, 'r')
lines = [line.strip() for line in ignore.readlines()]
return lines
def match(filename: str, patterns: List[str]) -> bool:
"""
Checks if filename matches ANY of 'patterns'.
:param filename: A path of a file.
:param patterns: A list of standard UNIX file patterns.
:return: True if filename matches ANY of 'patterns', False otherwise.
"""
for m in patterns:
if fnmatch.fnmatch(filename, m):
return True
return False
def path_join(*paths) -> Optional[Path]:
"""
Joins multiple strings to a single Path object.
:param paths: paths to join.
:return: A Path object corresponding to 'paths'. 'None' if 'paths' is empty.
"""
if len(paths) < 1:
return None
res = Path(paths[0]).joinpath(*paths[1:])
return res
def get_args() -> None:
"""
Configures command line arguments.
"""
global HOSTNAME, USERNAME, PASSWORD, PATH, IGNORE_PATH, EXECUTE_FILE
parser = argparse.ArgumentParser(description='Send Project to Ev3.')
parser.add_argument('--hostname', help="The ssh hostname (default is 'ev3dev')")
parser.add_argument('--username', help="The ssh username (default is 'robot')")
parser.add_argument('--password', help="The ssh password (default is 'maker')")
parser.add_argument('--path', help="The Directory to send (default is current directory).")
parser.add_argument('--exclude_file',
help="The file containing the list of files to ignore (default is '.ignore').")
parser.add_argument('--execute_file', help="A file to execute after transferring (local path relative to 'PATH').")
args = parser.parse_args()
if args.hostname:
HOSTNAME = args.hostname
if args.username:
USERNAME = args.username
if args.password:
PASSWORD = args.password
if args.path:
PATH = args.path
if args.exclude_file:
IGNORE_PATH = args.exclude_file
if args.execute_file:
EXECUTE_FILE = args.execute_file
def redirect_stdout_handler(st: TextIO):
"""
Copies 'st' to system stdout.
:param st: An output stream.
"""
for l in iter(st.readline, ""):
print(l, end="")
def redirect_stderr_handler(st: TextIO):
"""
Copies 'st' to system stderr.
:param st: An output stream.
"""
for l in iter(st.readline, ""):
print(l, end="", file=sys.stderr)
run_stdin = True
def redirect_stdin_handler(st: TextIO):
"""
Copies system stdin to st.
:param st: An input stream.
"""
global run_stdin
while run_stdin:
# if sys.stdin.isatty():
for line in sys.stdin:
if st.closed or sys.stdin.closed or not run_stdin:
break
print(line, end="", file=st)
def deploy(path: str = './', hostname: str = "ev3dev", username: str = "robot", password: str = "maker",
execute_file: Optional[str] = None, executable: List[str] = ('*.py',),
exclude_path: str = "./.ignore", print_console: bool = True,
redirect_stdout: bool = True, redirect_stderr: bool = True, redirect_stdin: bool = False) -> None:
"""
Send code to Ev3
:param path: The Directory to send (default is current directory).
:param hostname: The ssh hostname (default is 'ev3dev')
:param username: The ssh username (default is 'robot')
:param password: The ssh password (default is 'maker')
:param execute_file: A file to run on the ev3 when finished. 'None' to disable.
Note: this file must be marked as executable.
:param executable: A list of patterns of files that should be marked as executable (default is ['*.py']).
:param exclude_path: The file containing the list of files to ignore (default is '.ignore').
:param print_console: Should we print info to the console?
:param redirect_stdout: Should we redirect stdout form ev3 to console?
:param redirect_stderr: Should we redirect stderr form ev3 to console?
:param redirect_stdin: Should we redirect console input to ev3 stdin?
This is disabled by default as it cannot terminate without reading from stdin.
"""
# Get / Set working directory
if print_console:
print("CD", path)
os.chdir(path)
working_dir = os.getcwd()
dir_name = os.path.basename(working_dir)
exclude = read_exclude(exclude_path)
# Set up ssh
if print_console:
print("Starting ssh ...")
ssh = SSHClient()
ssh.load_system_host_keys()
if print_console:
print("Connecting to", F"{username}@{hostname} ...")
ssh.connect(hostname=hostname, username=username, password=password)
with SCPClient(ssh.get_transport()) as scp:
for subdir, dirs, files in os.walk('.'): # for every file in current working directory:
for filename in files:
filepath = subdir + '/' + filename # get full file path (relative to working directory)
if not match(filepath, exclude): # if the file path does not match any of the excluded patterns:
if print_console:
print("Sending", Path(filepath), "... ", end='')
# create the directory if it does not exist
ssh.exec_command('mkdir -p ' + path_join('~', dir_name, subdir).as_posix())
# copy files using scp
scp.put(str(path_join(working_dir, filepath)), path_join('~', dir_name, filepath).as_posix())
if print_console:
print("Sent")
if match(filepath, executable): # if file path matches any of the executable patterns:
# mark as executable
if print_console:
print(path_join('~', dir_name, filepath).as_posix(), "marked as executable.")
ssh.exec_command('chmod u+x ' + path_join('~', dir_name, filepath).as_posix())
else:
if print_console:
print('Excluding', Path(filepath), '.')
if execute_file:
if print_console:
print(F'\nExecuting {execute_file} ...\n')
# execute the file.
stdin, stdout, stderr = ssh.exec_command(path_join('~', dir_name, execute_file).as_posix(), get_pty=True)
# create the redirecting threads
if redirect_stdout:
out = threading.Thread(target=redirect_stdout_handler, args=(stdout,))
if redirect_stderr:
err = threading.Thread(target=redirect_stderr_handler, args=(stderr,))
if redirect_stdin:
child_pid = os.fork()
if child_pid == 0:
redirect_stdin_handler(stdin)
os.kill(os.getpid(), SIGTERM)
# sin = threading.Thread(target=redirect_stdin_handler, args=(stdin,))
# start them
if redirect_stdout:
out.start()
if redirect_stderr:
err.start()
# if redirect_stdin:
# sin.start()
# wait for them to terminate
if redirect_stdout:
out.join()
if redirect_stderr:
err.join()
if redirect_stdin:
global run_stdin
# tell redirect_stdin_handler to exit without sending data to stdin
run_stdin = False
# sys.stdin.close()
# sin.join()
os.kill(child_pid, SIGTERM)
if print_console:
print('\nFinished.')
if __name__ == '__main__':
get_args()
deploy(path=PATH, hostname=HOSTNAME, username=USERNAME, password=PASSWORD,
execute_file=EXECUTE_FILE, executable=EXECUTABLE, exclude_path=IGNORE_PATH)