-
Notifications
You must be signed in to change notification settings - Fork 1
/
manage_homelab.py
477 lines (383 loc) · 22.4 KB
/
manage_homelab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
import os
import sys
import base64
import pathlib
import argparse
import threading
import subprocess
# patch PyVmrun so it finds the vmrun path in the registry
try:
from pyvmrun import PyVmrun
except FileNotFoundError:
import winreg as reg
print("[+] Setting VMRUN path in registry for first run...")
key = reg.CreateKey(reg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\VMware, Inc.\\VMware Workstation')
reg.SetValueEx(key, 'InstallPath', 0, reg.REG_SZ, '"C:\\Program Files (x86)\\VMware\\VMware Workstation\\vmrun.exe"')
reg.CloseKey(key)
key = reg.CreateKey(reg.HKEY_LOCAL_MACHINE, 'SOFTWARE\\VMware, Inc.\\VMware Workstation\\OVFTool')
reg.SetValueEx(key, 'InstallPath', 0, reg.REG_SZ, 'C:\\"Program Files (x86)\\VMware\\VMware Workstation\\OVFTool\\ovftool.exe"')
reg.CloseKey(key)
from pyvmrun import PyVmrun
def get_zip_path():
if sys.platform == "win32":
# Windows
zip_path = r'C:\\"Program Files\\7-Zip\\7z.exe"'
else:
# Unix
zip_path = subprocess.check_output(['which', '7z']).decode('utf-8').strip()
if zip_path == '':
sys.exit("[!] Please install the 7zip package and try running again!")
return zip_path
def compress_vm_files(path, target_running, recursive=False):
valid_vmx_paths = [path for path in list_running_vms()]
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=path, recursive=recursive)
# Stops all valid VMXs files in the list
for current_path in valid_vmx_paths:
print(f"\n[*] Checking if VM at {current_path} is currently running...")
if pathlib.Path(current_path).stem in str(list_running_vms()):
print(f"[!] {pathlib.Path(current_path).stem} is currently running, attempting to stop now...")
stop_vms(vmx_paths=path, target_running=target_running, recursive=recursive)
print(f"[+] Attempting to compress {pathlib.Path(current_path).stem} to 7z with LZMA2 max compression.")
# Compression of files
os.system(f"cd {os.path.split(current_path)[0]}")
valid_vm_extensions = ['.vmx', '.vmdk', '.vmxf', '.nvram', '.vmsd']
valid_vm_files = [os.path.join(os.path.split(current_path)[0], f) for f in os.listdir(os.path.split(current_path)[0]) if os.path.splitext(f)[-1] in valid_vm_extensions]
#for i in valid_vm_files:
#print(f"Attempting to compress '{pathlib.Path(i).name}' into '{pathlib.Path(current_path).stem}.7z'")
#print("")
joined_files = '"' + '" "'.join(valid_vm_files) + '"'
zip_path = get_zip_path()
command = [zip_path, 'a', '-t7z', '-m0=lzma2', '-mx=9', '-aoa', '-y', '"' + os.path.join(os.path.split(current_path)[0], str(pathlib.Path(current_path).stem)) + '.7z' + '"', joined_files]
print("CMD: " + ' '.join(command))
os.system(' '.join(command))
print(f"Created 7z archive at {os.path.join(os.path.split(current_path)[0], str(pathlib.Path(current_path).stem)) + '.7z'}")
def extract_vm(zip_file, dest_path, prompt):
zip_path = get_zip_path()
if not prompt:
overwrite = '-aos'
prompt = '-y'
else:
overwrite = ''
prompt = ''
try:
os.chdir(dest_path)
except Exception as e:
print(e)
print(f"[+] Extracting {zip_file} into {os.getcwd()} now...")
# Force extract to destination, autorename the output file if the zip output already exists
command = [zip_path, 'x', zip_file, '-o' + '"' + os.path.join(os.getcwd(), pathlib.Path(zip_file).stem) + '"', overwrite, prompt]
print(' '.join(command))
os.system(' '.join(command))
def download_homelab_onedrive(lab_file, dest_path):
abs_path_lab_file = os.path.join(os.getcwd(), lab_file)
print(f"Downloading VMs into {dest_path} with links from {abs_path_lab_file}")
os.chdir(dest_path)
# Put links to OneDrive VMs in a list
with open(abs_path_lab_file, 'r') as f:
onedrive_file = f.read().replace('\t', '').split('\n')
# https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/shares_get?view=odsp-graph-online#encoding-sharing-urls
# Normalize the list to remove blank lines, comments and redirect a base64 version of the shortlink to the OneDrive API
onedrive_links = ["https://api.onedrive.com/v1.0/shares/u!".encode("UTF-8") + base64.urlsafe_b64encode(link.encode("UTF-8")) + "/root/content".encode("UTF-8") for link in onedrive_file if link != "" and not link.startswith("#")]
# Download the VMs
if sys.platform == "win32":
# Windows
curl_command = r'C:\Windows\System32\curl.exe'
else:
# Unix
curl_command = 'curl'
#thread_amount = len(onedrive_links)
# I know I know I know it's ugly...
#def _download_homelab_thread(command, thread_amount=thread_amount):
#semaphore = threading.Semaphore(thread_amount)
#with semaphore:
#os.system('start "ONEDRIVE VM DOWNLOAD" /MIN cmd /c' + command)
for num, link in enumerate(onedrive_links):
download_lab_command = [curl_command, '-L', f'"{link.decode()}"', "-o", f"VM_{num}.7z"] # Use quotes around link in case there are any special characters
print(f"\nCMD: {' '.join(download_lab_command)}")
command = ' '.join(download_lab_command)
#if sys.platform == "win32":
# threading.Thread(target=_download_homelab_thread, args=(f"{command}", )).start()
#else:
# os.system(command)
# Not ready for Threading just yet...
os.system(command)
def download_homelab(lab_file, dest_path):
abs_path_lab_file = os.path.join(os.getcwd(), lab_file)
# Download the VMs
if sys.platform == "win32":
# Windows
# So we can download the file in the temp directory
os.chdir(os.environ['temp'])
gd_downloader = os.path.join(os.getcwd(), 'goodls_windows_amd64.exe')
if not 'goodls_windows_amd64.exe' in os.listdir():
# Only download the Google Drive downloader if it doesn't already exist
command = ['curl', '-L', 'https://github.com/tanaikech/goodls/releases/download/v2.0.1/goodls_windows_amd64.exe', '-o', 'goodls_windows_amd64.exe']
# Execute the command to retrieve the google drive downloader
print(f"Retrieving google drive downloader tool, and placing it in temp.")
os.system(' '.join(command))
else:
os.chdir('/tmp')
gd_downloader = os.path.join(os.getcwd(), 'goodls_linux_amd64')
# sys.exit('[!] Please chmod +x /tmp/goodls_linux_amd64 before running again')
if not 'goodls_linux_amd64' in os.listdir():
#if subprocess.check_output(['chmod', '-u']).decode('utf-8').strip() != '0':
#sys.exit("[!] Please re-run your command with sudo")
# Only download the Google Drive downloader if it doesn't already exist
command = ['wget', 'https://github.com/tanaikech/goodls/releases/download/v2.0.1/goodls_linux_amd64']
# Execute the command to retrieve the google drive downloader
print(f"Retrieving google drive downloader tool, and placing it in temp.")
os.system(' '.join(command))
subprocess.call(['chmod', '+x', '/tmp/goodls_linux_amd64'])
print(f"Downloading VMs into {dest_path} with links from {abs_path_lab_file}")
os.chdir(dest_path)
# I know I know I know it's ugly...
#thread_amount = 3
#def _download_homelab_thread(command, thread_amount=thread_amount):
#semaphore = threading.Semaphore(thread_amount)
#with semaphore:
#os.system('start "GOOGLE DRIVE VM DOWNLOAD" /MIN cmd /c' + command)
download_lab_command = ' '.join([gd_downloader, ' < ', abs_path_lab_file])
#if sys.platform == "win32":
# threading.Thread(target=_download_homelab_thread, args=(f"{download_lab_command}", )).start()
#else:
# os.system(download_lab_command)
# Not ready for threading just yet...
os.system(download_lab_command)
def convert_vmx_to_ova(path, target_running, recursive=False):
if sys.platform == "win32":
# Windows w/ vmware tools installed
import winreg as reg
key = reg.OpenKeyEx(reg.HKEY_LOCAL_MACHINE, r'SOFTWARE\\VMware, Inc.\\VMware Workstation\\OVFTool')
vmrun = reg.QueryValueEx(key, 'InstallPath')[0]
if key:
reg.CloseKey(key)
else:
# Unix w/ vmware tools installed
try:
vmrun = subprocess.check_output(['which', 'vmrun']).decode('utf-8').strip()
except subprocess.CalledProcessError:
sys.exit("[!] Please ensure that vmrun is install and try running again!")
valid_vmx_paths = [path for path in list_running_vms()]
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=path, recursive=recursive)
# Stops all valid VMXs files in the list
for current_path in valid_vmx_paths:
print(f"\n[*] Checking if VM at {current_path} is currently running...")
if pathlib.Path(current_path).stem in str(list_running_vms()):
print(f"[!] {pathlib.Path(current_path).stem} is currently running, attempting to stop now...")
stop_vms(vmx_paths=path, target_running=target_running, recursive=recursive)
print(f"[+] Attempting to convert {pathlib.Path(current_path).name} to OVA format.")
print(f'[*] CMD: {vmrun} --overwrite --compress=9 --noImageFiles --skipManifestCheck --targetType=OVA "{current_path}" "{os.path.splitext(current_path)[0]}.ova"\n')
os.system(fr'{vmrun} --overwrite --compress=9 --noImageFiles --skipManifestCheck --targetType=OVA "{current_path}" "{os.path.splitext(current_path)[0]}.ova"')
def get_valid_vmxs(path, recursive=False):
# Recursive
valid_vmx_paths = []
if recursive:
valid_vmx_paths = [os.path.join(root, f) for root, dirs, files in os.walk(path) for f in files if os.path.splitext(f)[1] in [".vmx"] ]
else:
valid_vmx_paths = [ os.path.join(path, f) for f in os.listdir(path) if os.path.splitext(f)[1] in [".vmx"] ]
return valid_vmx_paths
def start_vms(vmx_paths, recursive=False):
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
# Starts all valid VMXs files in the list
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
# Start the VM
print(f"[+] Starting VM at {current_vmx_path}")
vm.start()
def stop_vms(vmx_paths, target_running, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Stops all valid VMXs files in the list
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to stop VM at {current_vmx_path}")
# Stop the VM
vm.stop(mode='soft')
def suspend_vms(vmx_paths, target_running, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Suspends all valid VMXs files in the list
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to suspend VM at {current_vmx_path}")
# Suspend the VM
vm.suspend()
def delete_vms(vmx_paths, target_running, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Deletes all valid VMs in the list using the VMX files
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to delete VM at {current_vmx_path}")
# Delete the VM
vm.deleteVM()
def create_snapshot(vmx_paths, target_running, snapshot_name, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Creates a snapshot for all valid VMs in the list using the VMX files
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to create snapshot {snapshot_name} for VM at {current_vmx_path}")
# Create the snapshot
vm.snapshot(name=snapshot_name)
def delete_snapshot(vmx_paths, target_running, snapshot_name, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Deletes a snapshot for all valid VMs in the list using the VMX files
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to delete snapshot {snapshot_name} for VM at {current_vmx_path}")
# Delete the snapshot
vm.deleteSnapshot(name=snapshot_name)
def restore_snapshot(vmx_paths, target_running, snapshot_name, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Restores from a snapshot in all valid VMs in the list using the VMX files
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"[+] Attempting to restore a snapshot for VM at {current_vmx_path}")
# Restore the snapshot
vm.revertToSnapshot(name=snapshot_name)
def list_snapshots(vmx_paths, target_running, recursive=False):
if not target_running:
# Gets a list of all valid VMX files starting from the top of the path
valid_vmx_paths = get_valid_vmxs(path=vmx_paths, recursive=recursive)
else:
valid_vmx_paths = [path for path in list_running_vms()]
# Restores from a snapshot in all valid VMs in the list using the VMX files
all_snapshots = []
for current_vmx_path in valid_vmx_paths:
vm = PyVmrun(current_vmx_path)
print(f"\n[+] Checking for snapshots for VM at {current_vmx_path}")
# Return a List of the snapshot
current_snapshot = vm.listSnapshots()
all_snapshots.append(current_snapshot)
for snapshot in current_snapshot:
print(snapshot, end='')
return all_snapshots
def list_running_vms():
vm = PyVmrun(vmx="")
running_vms = list(vm.list().keys())
return running_vms
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Download and control a homelab all from the comfort of your terminal.')
exclusive_group = parser.add_mutually_exclusive_group(required=False)
group = parser.add_mutually_exclusive_group(required=False)
parser.add_argument("-d", "--vm_directory_path", help="The path where your VMs are/will be stored.", default=os.getcwd(), type=pathlib.Path)
parser.add_argument('-x', '--extract', help="Extract 7z file(s) from a current or specified directory, when used with --download_homelab VM(s) will be extracted automatically", default=False, action="store_true")
exclusive_group.add_argument("-t", "--target_running", help="Set all other commands to target only running VMs", action="store_true")
exclusive_group.add_argument("-r", "--recursive", help="Set the search for VMXs to be recursive from the specified directory", action="store_true")
group.add_argument("--delete_vms", help="Delete the targeted VMs", action="store_true")
group.add_argument("--start_vms", help="Start the targeted VMs", action="store_true")
group.add_argument("--stop_vms", help="Stop the targeted VMs", action="store_true")
group.add_argument("--suspend_vms", help="Suspend the targeted VMs", action="store_true")
group.add_argument("--create_snapshot", metavar="SNAPSHOT_NAME", help="Create a snapshot for all targeted VMs, requires a snapshot name", type=str)
group.add_argument("--restore_snapshot",metavar="SNAPSHOT_NAME", help="Attempt to restore a snapshot for all targeted VMs, requires a snapshot name", type=str)
group.add_argument("--delete_snapshot", metavar="SNAPSHOT_NAME", help="Attempt to delete a snapshot for all targeted VMs, requires a snapshot name", type=str)
group.add_argument("--list_snapshots", help="Output a list of snapshots for all targeted VMs", action="store_true")
group.add_argument("-l", "--list_running_vms", help="List all running VMs", action="store_true")
group.add_argument("--convert_to_ova", help="Attempt to stop a VM, and convert it to OVA format", action="store_true")
group.add_argument('-c', '--compress', help="Compress the VM files with LZMA2 compression", action="store_true")
group.add_argument("--download_homelab", metavar="FILENAME.TXT", help="Download one or more VMs from a shared Google Drive by specifying a file containing one or multiple shared links. Used with -d", action="store", type=pathlib.Path)
group.add_argument("--download_homelab_onedrive", metavar="FILENAME.TXT", help="Download one or more VMs from a shared OneDrive by specifying a file containing one or multiple shared links. Used with -d", action="store", type=pathlib.Path)
args = parser.parse_args()
args.vm_directory_path = os.path.abspath(os.path.join(os.getcwd(), args.vm_directory_path))
# If no arguments, print help
if len(sys.argv) < 2:
sys.exit(parser.print_help())
if args.target_running:
print("[*] Tool search mode set to target only running VMs")
else:
print(f"[*] Tool search directory set to {args.vm_directory_path}")
if args.recursive:
print("[*] Tool search mode set to recursive")
if len(list_running_vms()) == 0:
print("[*] No running VMs found")
if args.start_vms:
start_vms(vmx_paths=args.vm_directory_path, recursive=args.recursive)
if args.delete_vms:
stop_vms(vmx_paths=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
delete_vms(vmx_paths=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.stop_vms:
stop_vms(vmx_paths=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.suspend_vms:
suspend_vms(vmx_paths=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.list_running_vms:
running_vms = sorted(list_running_vms())
print(f"[+] Currently {len(running_vms)} VMs are running")
for i in running_vms:
print(i)
if args.compress:
compress_vm_files(path=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.convert_to_ova:
convert_vmx_to_ova(path=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.list_snapshots:
list_snapshots(vmx_paths=args.vm_directory_path, target_running=args.target_running, recursive=args.recursive)
if args.create_snapshot:
create_snapshot(vmx_paths=args.vm_directory_path, target_running=args.target_running, snapshot_name=args.create_snapshot, recursive=args.recursive)
if args.restore_snapshot:
restore_snapshot(vmx_paths=args.vm_directory_path, target_running=args.target_running, snapshot_name=args.restore_snapshot, recursive=args.recursive)
if args.delete_snapshot:
delete_snapshot(vmx_paths=args.vm_directory_path, target_running=args.target_running, snapshot_name=args.delete_snapshot, recursive=args.recursive)
if args.download_homelab:
if args.extract:
print("[*] Extract flag is set, will automatically attempt to extract downloaded VMs")
else:
print("[!] Extract flag is not set, please run with -x if you'd like VMs extracted after download completion")
path = pathlib.Path(args.download_homelab)
dest_path = pathlib.Path(args.vm_directory_path)
if not path.is_file():
sys.exit("[!] Please specify a valid file containing Google Drive links")
elif not dest_path.is_dir():
sys.exit("[!] Please specify a valid directory path to store the VM(s) in with -d")
elif path.is_file():
print(f"[+] Downloading VMs to {args.vm_directory_path} now...")
download_homelab(lab_file=args.download_homelab, dest_path=args.vm_directory_path)
if args.download_homelab_onedrive:
if args.extract:
print("[*] Extract flag is set, will automatically attempt to extract downloaded VMs")
else:
print("[!] Extract flag is not set, please run with -x if you'd like VMs extracted after download completion")
path = pathlib.Path(args.download_homelab_onedrive)
dest_path = pathlib.Path(args.vm_directory_path)
if not path.is_file():
sys.exit("[!] Please specify a valid file containing Google Drive links")
elif not dest_path.is_dir():
sys.exit("[!] Please specify a valid directory path to store the VM(s) in with -d")
elif path.is_file():
print(f"[+] Downloading VMs to {args.vm_directory_path} now...")
download_homelab_onedrive(lab_file=args.download_homelab_onedrive, dest_path=args.vm_directory_path)
if args.extract:
print(f"[*] Checking {args.vm_directory_path} for *.7z files to extract")
zip_files = [ '"' + os.path.join(args.vm_directory_path, i) + '"' for i in os.listdir(args.vm_directory_path) if i.endswith('7z')]
if zip_files:
print("[+] Found files to extract, will prompt if overwriting is needed: ")
else:
sys.exit("[-] Could not find any files to extract")
for i in zip_files:
print(i)
for zip_file in zip_files:
extract_vm(zip_file=zip_file,dest_path=args.vm_directory_path, prompt=True)