Skip to content

Commit

Permalink
Update binary.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Captian-obvious authored Oct 23, 2023
1 parent 777ac40 commit bdf16fb
Showing 1 changed file with 37 additions and 154 deletions.
191 changes: 37 additions & 154 deletions pyrbxm/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,67 @@
from io import BytesIO
from dataclasses import dataclass
import numpy as np

from .tree import PropertyType, Instance

"""
using RobloxFiles.Enums;
using RobloxFiles.DataTypes;
using RobloxFiles.Utility;
"""


# https://blog.roblox.com/2013/05/condense-and-compress-our-custom-binary-file-format/
def decode_int(i):
return (i >> 1) ^ (-(i & 1))
##end


def encode_int(i):
return (i << 1) ^ (i >> 31)
##end


# http://stackoverflow.com/questions/442188/readint-readbyte-readstring-etc-in-python
class BinaryStream:
UINT32 = np.dtype(">i4")
F32 = np.dtype(">f4")
def __init__(self, base_stream):
self.base_stream = base_stream
##end
def read_bytes(self, length) -> bytes:
return self.base_stream.read(length)
##end
def write_bytes(self, value):
self.base_stream.write(value)
##end
def unpack(self, fmt):
return struct.unpack(fmt, self.read_bytes(struct.calcsize(fmt)))
##end
def pack(self, fmt, *data):
return self.write_bytes(struct.pack(fmt, *data))
##end
def read_string(self):
(length,) = self.unpack("<I")
return self.read_bytes(length).decode("utf8")
##end
def write_string(self, s):
self.pack("<I", len(s))
self.write_bytes(s.encode("utf8"))
##end
# https://blog.roblox.com/2013/05/condense-and-compress-our-custom-binary-file-format/
def read_interleaved(self, count: int, size: int = 4):
return (
np.frombuffer(self.read_bytes(count * size), np.uint8)
.reshape(size, count)
.T.flatten()
)
##end

def read_ints(self, count):
return decode_int(self.read_interleaved(count).view(self.UINT32))
##end
def write_ints(self, values):
self.pack(f"<{len(values)}f", *values)
##end
def read_floats(self, count):
return self.read_ints(count).view(self.F32)
##end
def write_floats(self, values):
self.pack(f"<{len(values)}f", *values)
##end
def read_instance_ids(self, count):
"""Reads and accumulates an interleaved buffer of integers."""
return self.read_ints(count).cumsum()
##end
def write_instance_ids(self, values):
"""Accumulatively writes an interleaved array of integers."""
self.write_ints(np.ediff1d(np.asarray(values), to_begin=values[0]))
##end
# http://stackoverflow.com/questions/32774910/clean-way-to-read-a-null-terminated-c-style-string-from-a-file
def readCString(self):
buf = bytearray()
Expand All @@ -76,14 +74,9 @@ def readCString(self):
return buf
else:
buf.extend(b)
##end
##end
##end
def writeCString(self, string):
self.write_bytes(string)
self.write_bytes(b"\0")
##end
##end
class META:
def __init__(self):
self.Data = {}
Expand Down Expand Up @@ -119,16 +112,17 @@ def __init__(self):
self.RootedServices = []
self.NumInstances = 0
self.InstanceIds = []
##end

def __str__(self):
return f"{self.ClassIndex}: {self.ClassName}x{self.NumInstances}"
##end

def deserialize(self, stream: BinaryStream, file: BinaryRobloxFile):
(self.ClassIndex,) = stream.unpack("<i")
self.ClassName = stream.read_string()
self.IsService, self.NumInstances = stream.unpack("<bi")
self.InstanceIds = stream.read_instance_ids(self.NumInstances)
file.Classes[self.ClassIndex] = self

# Type instType = Type.GetType($"RobloxFiles.{ClassName}");
# if instType is None:
# RobloxFile.LogError($"INST - Unknown class: {ClassName} while reading INST chunk.");
Expand All @@ -139,8 +133,7 @@ def deserialize(self, stream: BinaryStream, file: BinaryRobloxFile):
for i in range(self.NumInstances):
isRooted = stream.unpack("<b")
self.RootedServices.append(isRooted)
##endif
##endif

for i in range(self.NumInstances):
instId = self.InstanceIds[i]
# inst = Activator.CreateInstance(instType) as Instance;
Expand All @@ -150,9 +143,8 @@ def deserialize(self, stream: BinaryStream, file: BinaryRobloxFile):
if self.IsService:
isRooted = self.RootedServices[i]
inst.Parent = file if isRooted else None
##endif
file.Instances[instId] = inst
##end

def serialize(self, stream: BinaryStream, file: BinaryRobloxFile):
stream.pack("<i", self.ClassIndex)
stream.write_string(self.ClassName)
Expand All @@ -163,39 +155,37 @@ def serialize(self, stream: BinaryStream, file: BinaryRobloxFile):
# Instance service = file.Instances[instId];
# writer.Write(service.Parent == file);
stream.pack("<b", False)
##end
##endif
##end

def dump(self):
print(f"- ClassIndex: {self.ClassIndex}")
print(f"- ClassName: {self.ClassName}")
print(f"- IsService: {self.IsService}")

if self.IsService and self.RootedServices is not None:
print(f"- RootedServices: `{', '.join(self.RootedServices)}`")
##endif

print(f"- NumInstances: {self.NumInstances}")
print(f"- InstanceIds: `{', '.join(self.InstanceIds)}`")
##end
##end
##end


@dataclass
class PROP:
File: BinaryRobloxFile = None
Name: str = ""
ClassIndex: int = -1
Type: PropertyType = PropertyType.Unknown

@property
def Class(self):
return self.File.Classes[self.ClassIndex]
##end

@property
def ClassName(self):
return self.Class.ClassName if self.Class else "UnknownClass"
##end

def __str__(self):
return f"{self.Type} {self.ClassName}.{self.Name}"
##end

def deserialize(self, stream: BinaryStream, file: RobloxBinaryFile):
self.File = file
(self.ClassIndex,) = stream.unpack("<i")
Expand All @@ -209,8 +199,7 @@ def deserialize(self, stream: BinaryStream, file: RobloxBinaryFile):
), f"Unknown class index {self.ClassIndex} (@ {self})!"
ids = self.Class.InstanceIds
instCount = self.Class.NumInstances
##end
##end


class BinaryRobloxFileChunk:
"""
Expand Down Expand Up @@ -253,114 +242,10 @@ def deserialize(self, stream: BinaryStream):
self.Data = stream.read_bytes(self.Size)


"""
public
BinaryRobloxFileChunk(BinaryRobloxFileWriter
writer, bool
compress = True)
{
if (!writer.WritingChunk)
throw
new
Exception(
"BinaryRobloxFileChunk: Supplied writer must have WritingChunk set to True.");
Stream
stream = writer.BaseStream;
using(BinaryReader
reader = new
BinaryReader(stream, Encoding.UTF8, True))
{
long
length = (stream.Position - writer.ChunkStart);
stream.Position = writer.ChunkStart;
Size = (int)
length;
Data = reader.ReadBytes(Size);
}
CompressedData = LZ4Codec.Encode(Data, 0, Size);
CompressedSize = CompressedData.Length;
if (!compress | | CompressedSize > Size)
{
CompressedSize = 0;
CompressedData = Array.Empty < byte > ();
}
ChunkType = writer.ChunkType;
Reserved = 0;
}
public
void
WriteChunk(BinaryRobloxFileWriter
writer)
{
// Record
where
we
are
when
we
start
writing.
var
stream = writer.BaseStream;
long
startPos = stream.Position;
// Write
the
chunk
's data.
writer.WriteString(ChunkType, True);
writer.Write(CompressedSize);
writer.Write(Size);
writer.Write(Reserved);
if (CompressedSize > 0)
writer.Write(CompressedData);
else
writer.Write(Data);
// Capture
the
data
we
wrote
into
a
byte[]
array.
long
endPos = stream.Position;
int
length = (int)(endPos - startPos);
using(MemoryStream
buffer = new
MemoryStream())
{
stream.Position = startPos;
stream.CopyTo(buffer, length);
WriteBuffer = buffer.ToArray();
HasWriteBuffer = True;
}
}
}
}
"""


class BinaryRobloxFile(Instance): # (RobloxFile):
# Header Specific
MAGIC_HEADER = b"<roblox!\x89\xff\x0d\x0a\x1a\x0a"

def __init__(self):
# Header Specific
self.Version = 0
Expand All @@ -381,35 +266,35 @@ def __init__(self):
self.Name = "Bin:"
self.Referent = "-1"
self.ParentLocked = True
##end

@property
def Chunks(self):
return self.ChunksImpl
##end

@property
def HasMetadata(self):
return self.META is not None
##end

@property
def Metadata(self):
return self.META.Data if self.META else {}
##end

@property
def HasSharedStrings(self):
return self.SSTR is not None
##end

@property
def SharedStrings(self):
return self.SSTR.Strings if self.SSTR else {}
##end

@property
def HasSignatures(self):
return self.SIGN is not None
##end

@property
def Signatures(self):
return self.SIGN.Signatures if self.SIGN else []
##end

def deserialize(self, file):
stream = BinaryStream(file)
# Verify the signature of the file.
Expand Down Expand Up @@ -455,10 +340,8 @@ def deserialize(self, file):
self.LogError(
f"BinaryRobloxFile - Unhandled chunk-type: {chunk.ChunkType}!"
)
##end
if handler:
chunk_stream = BinaryStream(BytesIO(chunk.Data))
chunk.Handler = handler
handler.deserialize(chunk_stream, self)
##end
self.ChunksImpl.append(chunk)

0 comments on commit bdf16fb

Please sign in to comment.