diff --git a/binja_plugin/__init__.py b/binja_plugin/__init__.py new file mode 100644 index 0000000..2d590b8 --- /dev/null +++ b/binja_plugin/__init__.py @@ -0,0 +1,141 @@ +from binaryninja import * # type:ignore + +from miasm.analysis.binary import Container +from miasm.analysis.machine import Machine +from miasm.core import parse_asm +from miasm.core.asmblock import AsmCFG, asm_resolve_final +from miasm.core.locationdb import LocationDB +from themida_unmutate.main import unwrap_functions + +from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval + +PLUGIN_NAME = "themida-unmutate-bn" +SUPPORTED_ARCHS = ["x86_64"] + + +def deobfuscate_at_address(bv: BinaryView, address: int) -> None: + arch = str(bv.platform.arch) + if arch not in SUPPORTED_ARCHS: + return + + protected_func_addrs = [address] + binary_data = get_binary_data(bv) + miasm_ctx = create_miasm_context(arch, binary_data, bv.original_base) + + print("Resolving mutated's functions' addresses...") + mutated_func_addrs = unwrap_functions(miasm_ctx, protected_func_addrs) + + # Disassemble mutated functions and simplify them + print("Deobfuscating mutated functions...") + simplified_func_asmcfgs = disassemble_and_simplify_functions(miasm_ctx, mutated_func_addrs) + + # Map protected functions' addresses to their corresponding simplified `AsmCFG` + func_addr_to_simplified_cfg = { + protected_func_addrs[i]: asm_cfg + for i, asm_cfg in enumerate(simplified_func_asmcfgs) + } + + # Rewrite the protected binary with the simplified function + print("Patching binary file...") + rebuild_simplified_binary(miasm_ctx, func_addr_to_simplified_cfg, bv) + + # Relaunch analysis to take our changes into account + bv.update_analysis() + + +def get_binary_data(bv: BinaryView) -> bytearray: + # Sort sections by start address + sections = list(bv.sections.values()) + sorted_section = sorted(sections, key=lambda s: s.start) + + br = BinaryReader(bv) + last_section_address = bv.original_base + exe_data = bytearray() + for section in sorted_section: + # Pad with zeroes + padding_size = section.start - last_section_address + exe_data += b"\x00" * padding_size + exe_data += br.read(section.length, section.start) + last_section_address = section.start + section.length + + return exe_data + + +def create_miasm_context(arch: str, binary_data: bytearray, binary_base_address: int) -> MiasmContext: + loc_db = LocationDB() + machine = Machine(arch) + assert machine.dis_engine is not None + container = Container.from_string(binary_data, loc_db, addr=binary_base_address) + mdis = machine.dis_engine(container.bin_stream, loc_db=loc_db) + lifter = machine.lifter(loc_db) + + return MiasmContext(loc_db, container, machine, mdis, lifter) + + +def rebuild_simplified_binary( + miasm_ctx: MiasmContext, + func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG, MiasmFunctionInterval]], + bv: BinaryView, +) -> None: + bw = BinaryWriter(bv) + + # Reassemble simplified AsmCFGs + original_to_simplified: dict[int, int] = {} + for protected_func_addr, val in func_addr_to_simplified_cfg.items(): + simplified_asmcfg, orignal_asmcfg_interval = val + + # Unpin blocks to be able to relocate the CFG + head = simplified_asmcfg.heads()[0] + for asm_block in simplified_asmcfg.blocks: + miasm_ctx.loc_db.unset_location_offset(asm_block.loc_key) + + # Start rewriting at the first part of the interval (i.e., at the start + # of the mutated code) + target_addr: int = orignal_asmcfg_interval.intervals[0][0] + # Unpin loc_key if it's pinned + original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) + if original_loc is not None: + miasm_ctx.loc_db.unset_location_offset(original_loc) + + # Relocate the function's entry block + miasm_ctx.loc_db.set_location_offset(head, target_addr) + + # Generate the simplified machine code + new_section_patches = asm_resolve_final(miasm_ctx.mdis.arch, + simplified_asmcfg, + dst_interval=orignal_asmcfg_interval) + + # Apply patches + for address, data in new_section_patches.items(): + bw.write(bytes(data), address) + + # Associate original addr to simplified addr + original_to_simplified[protected_func_addr] = min(new_section_patches.keys()) + + # Redirect functions to their simplified versions + protected_function_addrs = func_addr_to_simplified_cfg.keys() + for target_addr in protected_function_addrs: + # Generate a single-block AsmCFG with a JMP to the simplified version + simplified_func_addr = original_to_simplified[target_addr] + original_loc_str = f"loc_{target_addr:x}" + jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" + jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str, + miasm_ctx.mdis.loc_db) + + # Unpin loc_key if it's pinned + original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) + if original_loc is not None: + miasm_ctx.loc_db.unset_location_offset(original_loc) + + # Relocate the newly created block and generate machine code + original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) + miasm_ctx.loc_db.set_location_offset(original_loc, target_addr) + new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg) + + # Apply patches + for address, data in new_jmp_patches.items(): + bw.write(bytes(data), address) + + +PluginCommand.register_for_address(PLUGIN_NAME, "Deobfuscate mutated code from this address", deobfuscate_at_address) diff --git a/plugin.json b/plugin.json new file mode 100644 index 0000000..ce658d6 --- /dev/null +++ b/plugin.json @@ -0,0 +1,24 @@ +{ + "pluginmetadataversion": 2, + "name": "themida-unmutate-bn", + "type": ["core"], + "api": ["python3"], + "description": "Static deobfuscator for Themida's mutation-based obfuscation.", + "longdescription": "", + "license": { + "name": "GPL-3.0-or-later", + "text": "" + }, + "platforms": ["Darwin", "Linux", "Windows"], + "installinstructions": { + "Darwin": "", + "Linux": "", + "Windows": "" + }, + "dependencies": { + "pip": ["miasm", "themida-unmutate"] + }, + "version": "0.1.0", + "author": "Erwan Grelet", + "minimumbinaryninjaversion": 3164 +} diff --git a/themida_unmutate/main.py b/themida_unmutate/main.py index 8b4380e..223d5ee 100644 --- a/themida_unmutate/main.py +++ b/themida_unmutate/main.py @@ -23,13 +23,13 @@ def entry_point() -> None: setup_logger(args.verbose) # Setup disassembler and lifter - miasm_ctx = MiasmContext(args.protected_binary) + miasm_ctx = MiasmContext.from_binary_file(args.protected_binary) # Resolve mutated functions' addresses if needed protected_func_addrs = list(map(lambda addr: int(addr, 0), args.addresses)) if not args.no_trampoline: LOGGER.info("Resolving mutated's functions' addresses...") - mutated_func_addrs = unwrap_functions(args.protected_binary, protected_func_addrs) + mutated_func_addrs = unwrap_functions(miasm_ctx, protected_func_addrs) else: # No trampolines to take care of, use target addresses directly mutated_func_addrs = protected_func_addrs @@ -70,14 +70,14 @@ def parse_arguments() -> Namespace: return parser.parse_args() -def unwrap_functions(target_binary_path: str, target_function_addrs: list[int]) -> list[int]: +def unwrap_functions(miasm_ctx: MiasmContext, target_function_addrs: list[int]) -> list[int]: """ Resolve mutated function's addresses from original function addresses. """ mutated_func_addrs: list[int] = [] for addr in target_function_addrs: LOGGER.debug("Resolving mutated code portion address for 0x%x..." % addr) - mutated_code_addr = unwrap_function(target_binary_path, addr) + mutated_code_addr = unwrap_function(miasm_ctx, addr) if mutated_code_addr == addr: raise Exception("Failure to unwrap function") diff --git a/themida_unmutate/miasm_utils.py b/themida_unmutate/miasm_utils.py index 73e8104..cbd228b 100644 --- a/themida_unmutate/miasm_utils.py +++ b/themida_unmutate/miasm_utils.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Self import miasm.expression.expression as m2_expr @@ -20,22 +21,25 @@ class MiasmContext: mdis: disasmEngine lifter: Lifter - def __init__(self, target_binary_path: str) -> None: + @classmethod + def from_binary_file(cls, target_binary_path: str) -> Self: """ - Initialize our Miasm context, targeted at x86_64 binaries. + Initialize our Miasm context from a binary file. """ - self.loc_db = LocationDB() + loc_db = LocationDB() with open(target_binary_path, 'rb') as target_binary: - self.container = Container.from_stream(target_binary, self.loc_db) - self.machine = Machine(self.container.arch) - assert self.machine.dis_engine is not None + container = Container.from_stream(target_binary, loc_db) + machine = Machine(container.arch) + assert machine.dis_engine is not None - self.mdis = self.machine.dis_engine(self.container.bin_stream, loc_db=self.loc_db) - self.lifter = self.machine.lifter(self.loc_db) + mdis = machine.dis_engine(container.bin_stream, loc_db=loc_db) + lifter = machine.lifter(loc_db) + + return cls(loc_db, container, machine, mdis, lifter) @property def arch(self) -> str: - return str(self.container.arch) + return str(self.machine.name) def expr_int_to_int(expr: m2_expr.ExprInt) -> int: diff --git a/themida_unmutate/unwrapping.py b/themida_unmutate/unwrapping.py index 389701a..0c8180e 100644 --- a/themida_unmutate/unwrapping.py +++ b/themida_unmutate/unwrapping.py @@ -6,13 +6,14 @@ from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int -def unwrap_function(target_bin_path: str, target_addr: int) -> int: - # Setup disassembler and lifter - miasm_ctx = MiasmContext(target_bin_path) - - # Disassemble trampoline +def unwrap_function(miasm_ctx: MiasmContext, target_addr: int) -> int: + # Save `follow_call` value and set it to `True` + saved_follow_call = miasm_ctx.mdis.follow_call miasm_ctx.mdis.follow_call = True + # Disassemble trampoline asmcfg = miasm_ctx.mdis.dis_multiblock(target_addr) + # Restore `follow_call` value + miasm_ctx.mdis.follow_call = saved_follow_call # Lift ASM to IR ircfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asmcfg)