From 51ccb372e98461531b3299db0b30c4e1e2dd780b Mon Sep 17 00:00:00 2001 From: ergrelet Date: Sat, 6 Apr 2024 23:15:20 +0200 Subject: [PATCH] Implement in-place binary rewriting Add a CLI option to enable in-place rewriting --- themida_unmutate/main.py | 183 ++++++++++++++++-- themida_unmutate/miasm_utils.py | 3 + .../symbolic_execution/__init__.py | 6 +- .../symbolic_execution/x86/__init__.py | 25 ++- 4 files changed, 188 insertions(+), 29 deletions(-) diff --git a/themida_unmutate/main.py b/themida_unmutate/main.py index b5e1f95..14f21c0 100644 --- a/themida_unmutate/main.py +++ b/themida_unmutate/main.py @@ -3,11 +3,11 @@ import lief from miasm.core import parse_asm -from miasm.core.asmblock import AsmCFG, asm_resolve_final +from miasm.core.asmblock import AsmCFG, asm_resolve_final, bbl_simplifier from miasm.core.interval import interval from themida_unmutate.logging import setup_logger, LOGGER -from themida_unmutate.miasm_utils import MiasmContext +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions from themida_unmutate.unwrapping import unwrap_function @@ -49,7 +49,8 @@ def entry_point() -> None: # Rewrite the protected binary with simplified functions LOGGER.info("Rebuilding binary file...") rebuild_simplified_binary(miasm_ctx, func_addr_to_simplified_cfg, - args.protected_binary, args.output) + args.protected_binary, args.output, + args.reassemble_in_place) LOGGER.info("Done! You can find your deobfuscated binary at '%s'" % args.output) @@ -75,6 +76,10 @@ def parse_arguments() -> Namespace: parser.add_argument("--no-trampoline", action='store_true', help="Disable function unwrapping") + parser.add_argument("--reassemble-in-place", + action='store_true', + help="Rewrite simplified code over the mutated code" + "rather than in a new code section") parser.add_argument("-v", "--verbose", action='store_true', @@ -105,9 +110,11 @@ def unwrap_functions(target_binary_path: str, def rebuild_simplified_binary( miasm_ctx: MiasmContext, - func_addr_to_simplified_cfg: dict[int, AsmCFG], + func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG, + MiasmFunctionInterval]], input_binary_path: str, output_binary_path: str, + reassemble_in_place: bool, ) -> None: """ Reassemble functions' `AsmCFG` and rewrite the input binary with simplified @@ -116,6 +123,28 @@ def rebuild_simplified_binary( if len(func_addr_to_simplified_cfg) == 0: raise ValueError("`protected_function_addrs` cannot be empty") + if reassemble_in_place: + __rebuild_simplified_binary_in_place(miasm_ctx, + func_addr_to_simplified_cfg, + input_binary_path, + output_binary_path) + else: + __rebuild_simplified_binary_in_new_section( + miasm_ctx, func_addr_to_simplified_cfg, input_binary_path, + output_binary_path) + + +def __rebuild_simplified_binary_in_new_section( + miasm_ctx: MiasmContext, + func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG, + MiasmFunctionInterval]], + input_binary_path: str, + output_binary_path: str, +) -> None: + """ + Reassemble functions' `AsmCFG` and rewrite the input binary with simplified + machine code in a new code section. + """ # Open the target binary with LIEF pe_obj = lief.PE.parse(input_binary_path) if pe_obj is None: @@ -129,16 +158,18 @@ def rebuild_simplified_binary( | lief.PE.SECTION_CHARACTERISTICS.MEM_EXECUTE.value) pe_obj.add_section(unmut_section) unmut_section = pe_obj.get_section(NEW_SECTION_NAME) - - image_base = pe_obj.imagebase - unmut_section_base = image_base + unmut_section.virtual_address + unmut_section_base = pe_obj.imagebase + unmut_section.virtual_address # Reassemble simplified AsmCFGs original_to_simplified: dict[int, int] = {} next_min_offset_for_asm = 0 unmut_section_patches: list[tuple[int, bytes]] = [] - for protected_func_addr, simplified_asmcfg in \ + for protected_func_addr, val in \ func_addr_to_simplified_cfg.items(): + simplified_asmcfg, _ = val + # Simplify CFG further (by merging basic blocks when possible) + simplified_asmcfg = bbl_simplifier(simplified_asmcfg) + # Unpin blocks to be able to relocate the whole CFG head = simplified_asmcfg.heads()[0] for ir_block in simplified_asmcfg.blocks: @@ -168,7 +199,7 @@ def rebuild_simplified_binary( next_min_offset_for_asm = max( new_section_patches.keys()) - unmut_section_base + 15 - # Overwrite the section's content + # Overwrite the new section's content new_section_size = next_min_offset_for_asm new_content = bytearray([0] * new_section_size) for addr, data in unmut_section_patches: @@ -176,10 +207,124 @@ def rebuild_simplified_binary( new_content[offset:offset + len(data)] = data unmut_section.content = memoryview(new_content) - # Find the section containing the virtual addresses we want to modify + # Find the section containing the original function + protected_function_addrs = func_addr_to_simplified_cfg.keys() + text_section = __section_from_virtual_address( + pe_obj, next(iter(protected_function_addrs))) + assert text_section is not None + + # Redirect functions to their simplified versions + unmut_jmp_patches: list[tuple[int, bytes]] = [] + for target_addr in protected_function_addrs: + # Generate a single-block AsmCFG with a JMP to the simplified version + simplified_func_addr = original_to_simplified[target_addr] + original_loc_str = f"loc_{target_addr:x}" + jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" + jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, + miasm_ctx.mdis.attrib, + jmp_unmut_instr_str, + miasm_ctx.mdis.loc_db) + + # Unpin loc_key if it's pinned + original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) + if original_loc is not None: + miasm_ctx.loc_db.unset_location_offset(original_loc) + + # Relocate the newly created block and generate machine code + original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) + miasm_ctx.loc_db.set_location_offset(original_loc, target_addr) + new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, + jmp_unmut_asmcfg) + + # Merge patches into the patch list + for patch in new_jmp_patches.items(): + unmut_jmp_patches.append(patch) + + # Apply patches + text_section_base = pe_obj.imagebase + text_section.virtual_address + text_section_bytes = bytearray(text_section.content) + for addr, data in unmut_jmp_patches: + offset = addr - text_section_base + text_section_bytes[offset:offset + len(data)] = data + text_section.content = memoryview(text_section_bytes) + + # Invoke the builder + builder = lief.PE.Builder(pe_obj) + builder.build() + + # Save the result + builder.write(output_binary_path) + + +def __rebuild_simplified_binary_in_place( + miasm_ctx: MiasmContext, + func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG, + MiasmFunctionInterval]], + input_binary_path: str, + output_binary_path: str, +) -> None: + """ + Reassemble functions' `AsmCFG` and rewrite the input binary with simplified + machine code by overwriting the mutated code. + """ + # Open the target binary with LIEF + pe_obj = lief.PE.parse(input_binary_path) + if pe_obj is None: + raise Exception(f"Failed to parse PE '{input_binary_path}'") + + # Reassemble simplified AsmCFGs + original_to_simplified: dict[int, int] = {} + unmut_patches: list[tuple[int, bytes]] = [] + for protected_func_addr, val in \ + func_addr_to_simplified_cfg.items(): + simplified_asmcfg, orignal_asmcfg_interval = val + + # Unpin blocks to be able to relocate the CFG + head = simplified_asmcfg.heads()[0] + for asm_block in simplified_asmcfg.blocks: + miasm_ctx.loc_db.unset_location_offset(asm_block.loc_key) + + # Start rewriting at the first part of the interval (i.e., at the start + # of the mutated code) + target_addr: int = orignal_asmcfg_interval.intervals[0][0] + # Unpin loc_key if it's pinned + original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) + if original_loc is not None: + miasm_ctx.loc_db.unset_location_offset(original_loc) + + # Relocate the function's entry block + miasm_ctx.loc_db.set_location_offset(head, target_addr) + + # Generate the simplified machine code + new_section_patches = asm_resolve_final( + miasm_ctx.mdis.arch, + simplified_asmcfg, + dst_interval=orignal_asmcfg_interval) + + # Merge patches into the patch list + for patch in new_section_patches.items(): + unmut_patches.append(patch) + + # Associate original addr to simplified addr + original_to_simplified[protected_func_addr] = min( + new_section_patches.keys()) + + # Find Themida's section + themida_section = __section_from_virtual_address(pe_obj, target_addr) + assert themida_section is not None + + # Overwrite Themida's section content + themida_section_base = pe_obj.imagebase + themida_section.virtual_address + new_content = bytearray(themida_section.content) + for addr, data in unmut_patches: + offset = addr - themida_section_base + new_content[offset:offset + len(data)] = data + themida_section.content = memoryview(new_content) + + # Find the section containing the original function protected_function_addrs = func_addr_to_simplified_cfg.keys() - target_rva = next(iter(protected_function_addrs)) - image_base - text_section = section_from_virtual_address(pe_obj, target_rva) + text_section = __section_from_virtual_address( + pe_obj, next(iter(protected_function_addrs))) assert text_section is not None # Redirect functions to their simplified versions @@ -210,7 +355,7 @@ def rebuild_simplified_binary( unmut_jmp_patches.append(patch) # Apply patches - text_section_base = image_base + text_section.virtual_address + text_section_base = pe_obj.imagebase + text_section.virtual_address text_section_bytes = bytearray(text_section.content) for addr, data in unmut_jmp_patches: offset = addr - text_section_base @@ -225,10 +370,16 @@ def rebuild_simplified_binary( builder.write(output_binary_path) -def section_from_virtual_address(lief_bin: lief.Binary, - virtual_addr: int) -> Optional[lief.Section]: +def __section_from_virtual_address( + lief_bin: lief.Binary, virtual_addr: int) -> Optional[lief.Section]: + rva = virtual_addr - lief_bin.imagebase + return __section_from_rva(lief_bin, rva) + + +def __section_from_rva(lief_bin: lief.Binary, + rva: int) -> Optional[lief.Section]: for s in lief_bin.sections: - if s.virtual_address <= virtual_addr < s.virtual_address + s.size: + if s.virtual_address <= rva < s.virtual_address + s.size: assert isinstance(s, lief.Section) return s diff --git a/themida_unmutate/miasm_utils.py b/themida_unmutate/miasm_utils.py index 7980d02..42fc55b 100644 --- a/themida_unmutate/miasm_utils.py +++ b/themida_unmutate/miasm_utils.py @@ -5,9 +5,12 @@ from miasm.analysis.binary import Container from miasm.analysis.machine import Machine from miasm.core.asmblock import disasmEngine +from miasm.core.interval import interval from miasm.core.locationdb import LocationDB from miasm.ir.ir import Lifter +MiasmFunctionInterval = interval + @dataclass class MiasmContext: diff --git a/themida_unmutate/symbolic_execution/__init__.py b/themida_unmutate/symbolic_execution/__init__.py index b8de460..acdcd9f 100644 --- a/themida_unmutate/symbolic_execution/__init__.py +++ b/themida_unmutate/symbolic_execution/__init__.py @@ -1,12 +1,12 @@ from miasm.core.asmblock import AsmCFG import themida_unmutate.symbolic_execution.x86 as symex_x86 -from themida_unmutate.miasm_utils import MiasmContext +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval def disassemble_and_simplify_functions( - miasm_ctx: MiasmContext, - mutated_func_addrs: list[int]) -> list[AsmCFG]: + miasm_ctx: MiasmContext, mutated_func_addrs: list[int] +) -> list[tuple[AsmCFG, MiasmFunctionInterval]]: """ Disassemble mutated functions, simplify their `AsmCFG` and return them. """ diff --git a/themida_unmutate/symbolic_execution/x86/__init__.py b/themida_unmutate/symbolic_execution/x86/__init__.py index 5c98f25..c388268 100644 --- a/themida_unmutate/symbolic_execution/x86/__init__.py +++ b/themida_unmutate/symbolic_execution/x86/__init__.py @@ -3,12 +3,13 @@ import miasm.arch.x86.arch as x86_arch import miasm.expression.expression as m2_expr -from miasm.core.asmblock import AsmCFG, disasmEngine, bbl_simplifier +from miasm.core.asmblock import AsmCFG, disasmEngine from miasm.core.cpu import instruction +from miasm.core.interval import interval from miasm.ir.symbexec import SymbolicExecutionEngine from themida_unmutate.logging import LOGGER -from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, expr_int_to_int AMD64_PTR_SIZE = 64 X86_BINARY_OPS_MAPPING = { @@ -30,18 +31,23 @@ def disassemble_and_simplify_functions( - miasm_ctx: MiasmContext, - mutated_func_addrs: list[int]) -> list[AsmCFG]: + miasm_ctx: MiasmContext, mutated_func_addrs: list[int] +) -> list[tuple[AsmCFG, MiasmFunctionInterval]]: """ Disassemble mutated functions, simplify their `AsmCFG` and return them. """ # Iterate through functions, disassemble and simplify them - simplified_func_asmcfgs: list[AsmCFG] = [] + simplified_func_asmcfgs: list[tuple[AsmCFG, MiasmFunctionInterval]] = [] for mutated_code_addr in mutated_func_addrs: LOGGER.info("Simplifying function at 0x%x..." % mutated_code_addr) # Disassemble function asm_cfg = miasm_ctx.mdis.dis_multiblock(mutated_code_addr) + # Compute function's interval (this is needed when rewriting the binary + # in-place) + original_func_interval: MiasmFunctionInterval = interval( + blk.get_range() for blk in asm_cfg.blocks) + # Lift assembly to IR ir_cfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asm_cfg) @@ -70,8 +76,10 @@ def disassemble_and_simplify_functions( asm_cfg, ir_block.assignblks[0].instr) # Note(ergrelet): reset the instruction's additional info to avoid # certain assembling issues where instruction prefixes are mixed - # in a illegal way. + # in an illegal way. relocatable_instr.additional_info = x86_arch.additional_info() + relocatable_instr.additional_info.g1.value = 0 # type: ignore + relocatable_instr.additional_info.g2.value = 0 # type: ignore asm_block.lines[0] = relocatable_instr continue @@ -242,10 +250,7 @@ def disassemble_and_simplify_functions( LOGGER.warning("Unsupported instruction or unmutated block found. " "Block will be kept as is.") - # Simplify CFG (by merging basic blocks when possible) - asm_cfg = bbl_simplifier(asm_cfg) - - simplified_func_asmcfgs.append(asm_cfg) + simplified_func_asmcfgs.append((asm_cfg, original_func_interval)) return simplified_func_asmcfgs