-
Notifications
You must be signed in to change notification settings - Fork 2
/
DosUtils.py
78 lines (78 loc) · 3.48 KB
/
DosUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import struct
class DosUtils(object):
def __init__(self, **kwargs):
self.amiga = kwargs["debugger"]
self.execlib = kwargs["execlib"]
self.doslib = kwargs["doslib"]
self.snip = kwargs["snippets"]
self.bufsize = 0x10000
return
def readfile(self, filepath):
bufaddr = self.execlib.AllocMem(self.bufsize, self.execlib.MEMF_PUBLIC)
print(f"buffer addr: {hex(bufaddr)} size: {self.bufsize}")
filepathaddr = self.snip.getaddrstr(filepath)
fh = self.doslib.Open(filepathaddr, self.doslib.MODE_OLDFILE)
if not fh:
raise ValueError()
data = bytearray()
while True:
actualLength = self.doslib.Read(fh, bufaddr, self.bufsize)
if not actualLength:
break
data += self.snip.verifiedreadmem(bufaddr, actualLength)
if actualLength < self.bufsize:
break
self.doslib.Close(fh)
return bytes(data)
def writefile(self, filepath, data):
bufsize = min(self.bufsize, len(data))
bufaddr = self.execlib.AllocMem(bufsize, self.execlib.MEMF_PUBLIC)
print(f"buffer addr: {hex(bufaddr)} size: {hex(bufsize)}")
filepathaddr = self.snip.getaddrstr(filepath)
fh = self.doslib.Open(filepathaddr, self.doslib.MODE_NEWFILE)
if not fh:
raise ValueError()
for offset in range(0, len(data), bufsize):
remaining = len(data) - offset
stepsize = min(remaining, bufsize)
print(f"transferring offset {hex(offset)} remaining {hex(remaining)}")
self.snip.verifiedwritemem(bufaddr, data[offset:offset + stepsize])
returnedLength = self.doslib.Write(fh, bufaddr, stepsize)
print(f"returnedLength: {hex(returnedLength)}")
print("Closing file.")
success = self.doslib.Close(fh)
print(f"Success: {success}")
self.execlib.FreeMem(bufaddr, bufsize)
return
def sendpacket(self, replyport, handlerport, action, arg1):
msg = self.execlib.AllocMem(68, self.execlib.MEMF_PUBLIC | self.execlib.MEMF_CLEAR)
pkt = msg+20
self.amiga.poke32(msg+self.execlib.ln_Name, pkt)
self.amiga.poke32(pkt+self.doslib.dp_Link, msg)
self.amiga.poke32(pkt+self.doslib.dp_Port, replyport)
self.amiga.poke32(pkt+self.doslib.dp_Type, action)
self.amiga.poke32(pkt+self.doslib.dp_Arg1, arg1)
print(f"PutMsg handler {hex(handlerport)} msg {hex(msg)}")
self.execlib.PutMsg(handlerport, msg)
self.execlib.WaitPort(replyport)
self.execlib.GetMsg(replyport)
res1 = self.amiga.peek32(pkt+self.doslib.dp_Res1)
self.execlib.FreeMem(msg, 68)
return res1
def inhibit(self, filesystem, flag):
filesystemaddr = self.snip.getaddrstr(filesystem)
if self.execlib.version >= 36:
success = self.doslib.Inhibit(filesystemaddr, flag)
else:
handlerport = self.doslib.DeviceProc(filesystemaddr)
if not handlerport:
print("DeviceProc failed.")
return self.doslib.DOSFALSE
msgport = self.execlib.createmsgport()
if not msgport:
print("createmsgport() failed.")
return self.doslib.DOSFALSE
print(f"msgport: {hex(msgport)}")
success = self.sendpacket(msgport, handlerport, self.doslib.ACTION_INHIBIT, flag)
self.execlib.deletemsgport(msgport)
return success