-
Notifications
You must be signed in to change notification settings - Fork 1
/
installer.py
232 lines (194 loc) · 7.94 KB
/
installer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import logging
import pathlib
import patoolib
import re
import shutil
import sys
import threading
import time
from helper.config_operations import get_library_path, get_debug_mode
from helper.file_operations import create_temp_folder, delete_temp_folder, create_logger
from patoolib.util import PatoolError
import content_database
import patches
# Create a logger instance
logger = create_logger()
# Path to the temporary extraction folder
TEMP_FOLDER = pathlib.Path("temp")
# Folders to target during extraction
TARGET_FOLDERS = [
"aniBlocks", "data", "Environments", "Light Presets", "People", "Props",
"ReadMe's", "Render Presets", "Render Settings", "Runtime", "Scenes",
"Scripts", "Shader Presets", "Cameras", "Documentation"
]
# Determine base path based on execution context
if getattr(sys, "frozen", False):
BASE_PATH = pathlib.Path(sys._MEIPASS)
else:
BASE_PATH = pathlib.Path(__file__).parent
# Path to the 7z executable
SEVEN_ZIP_PATH = BASE_PATH / "7z/7z.exe"
# Create a threading lock
lock = threading.Lock()
archive_exists = False
def get_relative_path(full_path: str) -> str:
"""
Get the relative path based on target folders.
If the path contains a target folder, return the sub-path starting
from the target folder.
"""
pattern = r'|'.join([re.escape(folder) for folder in TARGET_FOLDERS])
match = re.search(pattern, full_path)
return full_path[match.start():] if match else full_path
def clean_temp_folder() -> None:
"""
Clean the temporary folder by removing its contents.
"""
if TEMP_FOLDER.exists():
shutil.rmtree(TEMP_FOLDER)
TEMP_FOLDER.mkdir(parents=True, exist_ok=True)
def extract_archive(item_path: pathlib.Path, is_debug_mode: bool) -> bool:
"""
Extract an archive into the temporary folder.
"""
base_item_name = item_path.name
if base_item_name.lower().endswith(('.zip', '.rar', '.7z', '.tar')):
logger.info(f"Extracting {base_item_name}")
try:
verbosity = 2 if is_debug_mode else -1
patoolib.extract_archive(
str(item_path),
outdir=str(TEMP_FOLDER),
verbosity=verbosity,
interactive=False,
program=str(SEVEN_ZIP_PATH)
)
time.sleep(1)
return True
except PatoolError as e:
logger.error(f"Failed to extract archive {base_item_name}: {e}")
return False
return False
def clean_folder(folder_path: pathlib.Path) -> None:
"""
Remove all files in the specified folder.
"""
for item in folder_path.iterdir():
if item.is_file():
item.unlink()
def add_to_database(root_path: pathlib.Path, item: pathlib.Path) -> bool:
"""
Add the extracted files to the content database.
"""
archive_name = item.stem.split(".")[0]
file_list = [get_relative_path(str(file_path)) for file_path in root_path.rglob("*") if file_path.is_file()]
if content_database.does_archive_exist(archive_name, file_list):
logger.info(f"Archive '{archive_name}' already exists in the database.")
global archive_exists
archive_exists = True
return True
else:
logger.info(f"Adding archive '{archive_name}' with {len(file_list)} files to the database.")
content_database.add_archive(archive_name, file_list)
time.sleep(1)
return False
def handle_nested_archives(root_path, files, is_debug_mode):
"""
Handle and extract nested archives within the main archive.
"""
archive_extracted = False
for file in files:
file_path = root_path / file
if file.lower().endswith(('.zip', '.rar', '.7z', '.tar')):
logger.info(f"Extracting nested archive: {file}")
try:
verbosity = 2 if is_debug_mode else -1
patoolib.extract_archive(
str(file_path),
outdir=str(root_path),
verbosity=verbosity,
interactive=False,
program=str(SEVEN_ZIP_PATH),
)
time.sleep(0.5)
file_path.unlink() # Delete the nested archive after extraction
archive_extracted = True
except PatoolError as e:
logger.error(f"Failed to extract nested archive {file}: {e}")
return archive_extracted
def process_manifest_and_target_folders(root_path, dirs, files, progressbar, current_item):
"""
Check for manifest files and target folders, and process them accordingly.
"""
manifest_exists = any(file.lower().endswith("manifest.dsx") for file in files)
if manifest_exists or any(target in dirs for target in TARGET_FOLDERS):
for folder in dirs:
if manifest_exists and folder.lower().startswith("content"):
content_path = root_path / folder
clean_folder(content_path)
progressbar.set(progressbar.get() + 0.1)
if add_to_database(content_path, current_item):
progressbar.set(progressbar.get() + 0.1)
return False
shutil.copytree(content_path, get_library_path(), dirs_exist_ok=True)
return True
if any(target.lower() == folder.lower() for target in TARGET_FOLDERS):
clean_folder(root_path)
progressbar.set(progressbar.get() + 0.1)
if add_to_database(root_path, current_item):
return False
shutil.copytree(root_path, get_library_path(), dirs_exist_ok=True)
return True
return False
def traverse_directory(folder_path: pathlib.Path, current_item: pathlib.Path, progressbar, is_debug_mode: bool):
"""
Traverse the directory structure and handle nested archives and target folders.
"""
for root, dirs, files in folder_path.walk():
root_path = pathlib.Path(root)
if handle_nested_archives(root_path, files, is_debug_mode):
progressbar.set(progressbar.get() + 0.1)
return traverse_directory(folder_path, current_item, progressbar, is_debug_mode)
if process_manifest_and_target_folders(root_path, dirs, files, progressbar, current_item):
progressbar.set(progressbar.get() + 0.1)
return True
if archive_exists:
return False
progressbar.set(progressbar.get() + 0.1)
return False
def start_installer_gui(file_path: str, progressbar, is_delete_archive: bool = False) -> bool:
"""
Main function to handle the installation process via the GUI.
Args:
file_path (str): The path of the archive to process as a string.
progressbar: A GUI progress bar object to track progress.
is_delete_archive (bool): Whether to delete the archive after installation.
Returns:
bool: True if the archive was successfully imported, False otherwise.
"""
is_archive_imported = False
file_path = pathlib.Path(file_path)
with lock:
logger.info(f"Installing {file_path}")
create_temp_folder()
clean_temp_folder()
progressbar.set(0.1)
if not extract_archive(file_path, get_debug_mode()):
clean_temp_folder()
return is_archive_imported
progressbar.set(0.4)
if traverse_directory(TEMP_FOLDER, file_path, progressbar, get_debug_mode()):
is_archive_imported = True
logger.info(f"Successfully imported: {file_path}")
else:
is_archive_imported = False
logger.warning(f"Failed to import {file_path}. Invalid folder structure or asset already exists.")
clean_temp_folder()
delete_temp_folder()
if is_delete_archive:
try:
file_path.unlink()
logger.info(f"Deleted archive: {file_path}")
except Exception as e:
logger.error(f"Failed to delete archive {file_path}: {e}")
return is_archive_imported