Skip to content

Commit

Permalink
Some changes to the parser:
Browse files Browse the repository at this point in the history
* Pull out reading replay events to another method
* Make sure to fallback to the previous position in case we failed to parse header data
* More failsafes in the parser
  • Loading branch information
donadigo committed May 12, 2021
1 parent 3091026 commit 7def4b9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 44 deletions.
11 changes: 11 additions & 0 deletions pygbx/bytereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import struct
from io import IOBase
from pygbx.headers import Vector3
from os import SEEK_END

class PositionInfo(object):
"""
Expand Down Expand Up @@ -46,8 +47,12 @@ def __init__(self, obj):
self.data = obj
if isinstance(obj, IOBase):
self.get_bytes = self.__get_bytes_file
self.data.seek(0, SEEK_END)
self.size = self.data.tell()
self.data.seek(0)
else:
self.get_bytes = self.__get_bytes_generic
self.size = len(self.data)

self.pos = 0
self.seen_loopback = False
Expand Down Expand Up @@ -93,6 +98,12 @@ def read(self, num_bytes, typestr=None):
logging.error(e)
return 0

def size(self):
if isinstance(self.data, IOBase):
return self.data.tell()
else:
return len(self.data)

def __get_bytes_file(self, num_bytes):
self.data.seek(self.pos)
return self.data.read(num_bytes)
Expand Down
108 changes: 64 additions & 44 deletions pygbx/gbx.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(self, obj):
cdata = self.root_parser.read(compressed_data_size)
self.data = bytearray(lzo.decompress(cdata, False, data_size))

bp = ByteReader(self.data[:])
bp = ByteReader(self.data)
self._read_node(self.class_id, -1, bp)

def __read_sub_folder(self):
Expand All @@ -141,19 +141,19 @@ def __read_sub_folder(self):
self.root_parser.read_string()
self.__read_sub_folder()

def find_raw_chunk_id(self, chunk_id):
"""Finds a raw chunk ID in the file, skipping through any data that does not match the chunk ID provided.
"""Finds a raw chunk ID in the file, skipping through any data that does not match the chunk ID provided.
It is not guaranteed that the chunk found is indeed the desired data, as it could be other unrelated
chunk that bytes happened to form the chunk ID provided.
It is not guaranteed that the chunk found is indeed the desired data, as it could be other unrelated
chunk that bytes happened to form the chunk ID provided.
Args:
chunk_id (int): the chunk ID to search for
Args:
chunk_id (int): the chunk ID to search for
Returns:
ByteParser with the current position set right after the chunk ID, or None
if no specified chunk ID was found
"""
Returns:
ByteParser with the current position set right after the chunk ID, or None
if no specified chunk ID was found
"""
def find_raw_chunk_id(self, chunk_id):
bp = ByteReader(self.data[:])
for i in range(len(self.data) - 4):
bp.pos = i
Expand Down Expand Up @@ -209,6 +209,10 @@ def _read_user_data(self):
user_data_pos = self.root_parser.pos
num_chunks = self.root_parser.read_uint32()
for _ in range(num_chunks):
if self.root_parser.pos >= self.root_parser.size - 1:
self.root_parser.pos = user_data_pos + self.user_data_size
return

cid = self.root_parser.read_uint32()
self.root_parser.push_info()
size = self.root_parser.read_uint32()
Expand Down Expand Up @@ -671,15 +675,21 @@ def _read_node(self, class_id, depth, bp, add=True):
# For TM2
if ('version' in self.__replay_header_info
and self.__replay_header_info['version'] >= 8):
game_class.login = bp.read_string()
pos = bp.pos
try:
game_class.login = bp.read_string()
except:
bp.pos = pos

elif cid == 0x309200F or cid == 0x2401B00F:
game_class.login = bp.read_string()
elif cid == 0x3092010 or cid == 0x2401B010:
bp.read_string_lookback()
elif cid == 0x3092012 or cid == 0x2401B012:
bp.skip(4 + 16)
# import binascii
# bp.read(4)
# # bp.read(4)
# print(bp.read_uint32())
# print(f'{binascii.hexlify(bp.read(16))}')
# print()
elif cid == 0x3092013 or cid == 0x2401B013:
Expand All @@ -692,35 +702,8 @@ def _read_node(self, class_id, depth, bp, add=True):
bp.read_string_lookback()
bp.read_string_lookback()
bp.read_string_lookback()
elif cid == 0x3092019 or cid == 0x03092025 or cid == 0x2401B019:
if cid == 0x03092025:
bp.skip(4)

game_class.events_duration = bp.read_uint32()
bp.skip(4)

num_control_names = bp.read_uint32()
game_class.control_names = []
for _ in range(num_control_names):
name = bp.read_string_lookback()
if name != '':
game_class.control_names.append(name)

if len(game_class.control_names) == 0:
continue

num_control_entries = bp.read_uint32()
bp.skip(4)
for _ in range(num_control_entries):
time = bp.read_uint32() - 100000
name = game_class.control_names[bp.read_byte()]
entry = headers.ControlEntry(time, name, bp.read_uint16(), bp.read_uint16())
game_class.control_entries.append(entry)

game_class.game_version = bp.read_string()
bp.skip(3 * 4)
bp.read_string()
bp.skip(4)
elif cid == 0x3092019 or cid == 0x03092025 or cid == 0x2401B019 or cid == 0x2401B011:
Gbx.read_ghost_events(game_class, bp, cid)
elif cid == 0x309201c:
bp.skip(32)
elif cid == 0x03093004 or cid == 0x2403f004:
Expand All @@ -731,6 +714,39 @@ def _read_node(self, class_id, depth, bp, add=True):
else:
return

@staticmethod
def read_ghost_events(game_class, bp, cid):
if cid == 0x03092025:
game_class.is_maniaplanet = True
bp.skip(4)

game_class.events_duration = bp.read_uint32()
if game_class.events_duration != 0:
bp.skip(4)

num_control_names = bp.read_uint32()
game_class.control_names = []
for _ in range(num_control_names):
name = bp.read_string_lookback()
if name != '':
game_class.control_names.append(name)

if len(game_class.control_names) == 0:
return

num_control_entries = bp.read_uint32()
bp.skip(4)
for _ in range(num_control_entries):
time = bp.read_uint32() - 100000
name = game_class.control_names[bp.read_byte()]
entry = headers.ControlEntry(time, name, bp.read_uint16(), bp.read_uint16())
game_class.control_entries.append(entry)

game_class.game_version = bp.read_string()
bp.skip(3 * 4)
bp.read_string()
bp.skip(4)

@staticmethod
def read_ghost(game_class, bp):
uncomp_sz = bp.read_uint32()
Expand Down Expand Up @@ -771,8 +787,12 @@ def read_ghost(game_class, bp):
gr.read_int16(), gr.read_int16(),
gr.read_int8(), gr.read_int8())

if len(sample_sizes) == 1:
sample_sz = sample_sizes[0]
len_sizes = len(sample_sizes)
if i >= len_sizes:
if len_sizes >= 1:
sample_sz = sample_sizes[0]
else:
sample_sz = 0
else:
sample_sz = sample_sizes[i]

Expand Down
1 change: 1 addition & 0 deletions pygbx/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(self, id):
self.game_version = ''
self.control_names = []
self.events_duration = 0
self.is_maniaplanet = False
super(CGameCtnGhost, self).__init__(id)

class ControlEntry(object):
Expand Down

0 comments on commit 7def4b9

Please sign in to comment.