-
Notifications
You must be signed in to change notification settings - Fork 0
/
_nod.pyx
289 lines (222 loc) · 8.73 KB
/
_nod.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import os
from enum import Enum
from typing import Tuple, Optional, Callable, List
from contextlib import contextmanager
import cython
from cython.operator cimport dereference, preincrement
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString
from libc.stdint cimport uint64_t
from libcpp cimport bool as c_bool
from libcpp.string cimport string
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from nod_wrap cimport (
optional as c_optional,
string_view as c_string_view,
ExtractionContext as c_ExtractionContext,
createProgressCallbackFunction,
getDol as _getDol,
DiscBase as c_DiscBase,
Header as c_Header,
Kind_File,
IPartReadStream as c_IPartReadStream,
OpenDiscFromImage,
DiscBuilderGCN as c_DiscBuilderGCN,
createFProgressFunction,
Node,
EBuildResult,
EBuildResult_Success,
EBuildResult_Failed,
EBuildResult_DiskFull,
_handleNativeException,
checkException,
)
import nod.types
cdef string _str_to_string(str path):
return path.encode("utf-8")
cdef str _view_to_str(c_string_view str_view):
return PyBytes_FromStringAndSize(str_view.data(), str_view.size()).decode("utf-8")
ProgressCallback = Callable[[float, str, int], None]
cdef void invoke_callback_function(object callback, const string& a, float progress) except *:
callback(a.decode("utf-8"), progress)
cdef void invoke_fprogress_function(object callback, float totalProg, const string& fileName, size_t fileBytesXfered) except *:
callback(totalProg, fileName.decode("utf-8"), fileBytesXfered)
cdef _create_dol_header(const c_Header& h):
return nod.types.DolHeader(
game_id = PyBytes_FromStringAndSize(h.m_gameID, 6),
disc_num = h.m_discNum,
disc_version = h.m_discVersion,
audio_streaming = h.m_audioStreaming,
stream_buf_sz = h.m_streamBufSz,
wii_magic = h.m_wiiMagic,
gcn_magic = h.m_gcnMagic,
game_title = PyBytes_FromStringAndSize(h.m_gameTitle, 64),
disable_hash_verification = h.m_disableHashVerification,
disable_disc_enc = h.m_disableDiscEnc,
debug_mon_off = h.m_debugMonOff,
debug_load_addr = h.m_debugLoadAddr,
dol_off = h.m_dolOff,
fst_off = h.m_fstOff,
fst_sz = h.m_fstSz,
fst_max_sz = h.m_fstMaxSz,
fst_memory_address = h.m_fstMemoryAddress,
user_position = h.m_userPosition,
user_sz = h.m_userSz,
)
cdef class ExtractionContext:
cdef c_ExtractionContext c_context
def __cinit__(self):
self.c_context = c_ExtractionContext()
@property
def force(self):
return self.c_context.force
@force.setter
def force(self, value):
self.c_context.force = value
def set_progress_callback(self, callback):
self.c_context.progressCB = createProgressCallbackFunction(callback, invoke_callback_function)
cdef class PartReadStream:
cdef unique_ptr[c_IPartReadStream] c_stream
cdef uint64_t offset
cdef uint64_t _size
@staticmethod
cdef create(unique_ptr[c_IPartReadStream] c_stream, uint64_t size):
stream = PartReadStream()
stream.c_stream = move(c_stream)
stream.offset = stream.c_stream.get().position()
stream._size = size
return stream
def read(self, length=None):
if not self.c_stream:
raise RuntimeError("already closed")
cdef uint64_t actual_length
if length is None:
actual_length = self._size - self.tell()
else:
actual_length = length
buf = PyBytes_FromStringAndSize(NULL, actual_length)
buf_as_str = PyBytes_AsString(buf)
with nogil:
self.c_stream.get().read(buf_as_str, actual_length)
return buf
def seek(self, offset, whence=0):
if not self.c_stream:
raise RuntimeError("already closed")
if whence == 0:
offset += self.offset
elif whence == 2:
offset += self._size
elif whence != 1:
raise ValueError(f"Unknown whence: {whence}")
self.c_stream.get().seek(offset, whence)
def tell(self):
if not self.c_stream:
raise RuntimeError("already closed")
return self.c_stream.get().position() - self.offset
def close(self):
self.c_stream.reset()
def size(self):
return self._size
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
cdef _files_for(Node& node, prefix: str, result: list):
cdef c_optional[Node.DirectoryIterator] f
name = _view_to_str(node.getName())
if node.getKind() == Kind_File:
result.append(prefix + name)
else:
newPrefix = prefix
if name:
newPrefix = prefix + name + "/"
f = node.begin()
while dereference(f) != node.end():
_files_for(dereference(dereference(f)), newPrefix, result)
preincrement(dereference(f))
cdef class Partition:
cdef c_DiscBase.IPartition* c_partition
cdef object discParent
def __init__(self, parent):
self.discParent = parent
@staticmethod
cdef create(c_DiscBase.IPartition* c_partition, object parent):
partition = Partition(parent)
partition.c_partition = c_partition
return partition
def get_dol(self) -> bytes:
return _getDol(self.c_partition)
def get_header(self) -> nod.types.DolHeader:
return _create_dol_header(self.c_partition.getHeader())
def extract_to_directory(self, path: str, context: ExtractionContext) -> None:
def work():
cdef c_bool extraction_successful = False
cdef string native_path = _str_to_string(path)
with nogil:
extraction_successful = self.c_partition.extractToDirectory(
native_path,
context.c_context
)
if not extraction_successful:
raise RuntimeError("Unable to extract")
return _handleNativeException(work)
def files(self) -> List[str]:
cdef Node* node = &self.c_partition.getFSTRoot()
result = []
_files_for(dereference(node), "", result)
return result
def read_file(self, path: str, offset: int = 0) -> PartReadStream:
cdef Node* node = &self.c_partition.getFSTRoot()
cdef c_optional[Node.DirectoryIterator] f
for part in path.split("/"):
f = node.find(_str_to_string(part))
if dereference(f) != node.end():
node = &dereference(dereference(f))
else:
raise FileNotFoundError(f"File {part} not found in '{_view_to_str(node.getName())}'")
return PartReadStream.create(
dereference(dereference(f)).beginReadStream(offset),
dereference(dereference(f)).size(),
)
cdef class DiscBase:
cdef unique_ptr[c_DiscBase] c_disc
def get_data_partition(self) -> Optional[Partition]:
cdef c_DiscBase.IPartition*partition = self.c_disc.get().getDataPartition()
if partition:
return Partition.create(partition, self)
else:
return None
cdef class DiscBuilderGCN:
cdef c_DiscBuilderGCN* c_builder
def __init__(self, out_path: os.PathLike, progress_callback: ProgressCallback):
pass
def __cinit__(self, out_path: os.PathLike, progress_callback: ProgressCallback):
self.c_builder = new c_DiscBuilderGCN(_str_to_string(os.fspath(out_path)),
createFProgressFunction(progress_callback, invoke_fprogress_function))
def __dealloc__(self):
del self.c_builder
def build_from_directory(self, directory_in: os.PathLike) -> None:
def work():
cdef string native_path = _str_to_string(os.fspath(directory_in))
with nogil:
self.c_builder.buildFromDirectory(native_path)
return _handleNativeException(work)
@staticmethod
def calculate_total_size_required(directory_in: os.PathLike) -> Optional[int]:
cdef string native_path = _str_to_string(os.fspath(directory_in))
cdef c_optional[uint64_t] size
with nogil:
size = c_DiscBuilderGCN.CalculateTotalSizeRequired(native_path)
if size:
return cython.operator.dereference(size)
return None
def open_disc_from_image(path: os.PathLike) -> Tuple[DiscBase, bool]:
def work():
disc = DiscBase()
cdef string native_path = _str_to_string(os.fspath(path))
cdef c_bool is_wii = True
with nogil:
disc.c_disc = OpenDiscFromImage(native_path, is_wii)
checkException()
return disc, is_wii
return _handleNativeException(work)