From c99e943880b08e10a208e3e80d0c52a6cdf71fcd Mon Sep 17 00:00:00 2001 From: ergrelet Date: Tue, 14 May 2024 00:09:34 +0200 Subject: [PATCH] Reduce code duplication a bit --- binja_plugin/common.py | 27 +++----------- themida_unmutate/main.py | 63 ++++++++------------------------- themida_unmutate/miasm_utils.py | 33 +++++++++++++++-- themida_unmutate/unwrapping.py | 5 ++- 4 files changed, 51 insertions(+), 77 deletions(-) diff --git a/binja_plugin/common.py b/binja_plugin/common.py index a875237..95423cd 100644 --- a/binja_plugin/common.py +++ b/binja_plugin/common.py @@ -2,10 +2,9 @@ from miasm.analysis.binary import Container from miasm.analysis.machine import Machine -from miasm.core import parse_asm from miasm.core.asmblock import AsmCFG, asm_resolve_final from miasm.core.locationdb import LocationDB -from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, generate_code_redirect_patch def get_binary_data(bv: BinaryView) -> bytearray: @@ -88,25 +87,7 @@ def rebuild_simplified_binary( original_to_simplified[protected_func_addr] = min(new_section_patches.keys()) # Redirect functions to their simplified versions - protected_function_addrs = func_addr_to_simplified_cfg.keys() - for target_addr in protected_function_addrs: - # Generate a single-block AsmCFG with a JMP to the simplified version + for target_addr in func_addr_to_simplified_cfg.keys(): simplified_func_addr = original_to_simplified[target_addr] - original_loc_str = f"loc_{target_addr:x}" - jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" - jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str, - miasm_ctx.mdis.loc_db) - - # Unpin loc_key if it's pinned - original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) - if original_loc is not None: - miasm_ctx.loc_db.unset_location_offset(original_loc) - - # Relocate the newly created block and generate machine code - original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) - miasm_ctx.loc_db.set_location_offset(original_loc, target_addr) - new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg) - - # Apply patches - for address, data in new_jmp_patches.items(): - bw.write(bytes(data), address) + address, data = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr) + bw.write(data, address) diff --git a/themida_unmutate/main.py b/themida_unmutate/main.py index 223d5ee..e3192ee 100644 --- a/themida_unmutate/main.py +++ b/themida_unmutate/main.py @@ -2,14 +2,13 @@ from typing import Optional import lief -from miasm.core import parse_asm from miasm.core.asmblock import AsmCFG, asm_resolve_final, bbl_simplifier from miasm.core.interval import interval from themida_unmutate.logging import setup_logger, LOGGER -from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval +from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, generate_code_redirect_patch from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions -from themida_unmutate.unwrapping import unwrap_function +from themida_unmutate.unwrapping import resolve_mutated_code_address NEW_SECTION_NAME = ".unmut" NEW_SECTION_MAX_SIZE = 2**16 @@ -77,7 +76,7 @@ def unwrap_functions(miasm_ctx: MiasmContext, target_function_addrs: list[int]) mutated_func_addrs: list[int] = [] for addr in target_function_addrs: LOGGER.debug("Resolving mutated code portion address for 0x%x..." % addr) - mutated_code_addr = unwrap_function(miasm_ctx, addr) + mutated_code_addr = resolve_mutated_code_address(miasm_ctx, addr) if mutated_code_addr == addr: raise Exception("Failure to unwrap function") @@ -174,34 +173,18 @@ def __rebuild_simplified_binary_in_new_section( new_content[offset:offset + len(data)] = data unmut_section.content = memoryview(new_content) - # Find the section containing the original function - protected_function_addrs = func_addr_to_simplified_cfg.keys() - text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs))) - assert text_section is not None - # Redirect functions to their simplified versions + protected_function_addrs = func_addr_to_simplified_cfg.keys() unmut_jmp_patches: list[tuple[int, bytes]] = [] for target_addr in protected_function_addrs: # Generate a single-block AsmCFG with a JMP to the simplified version simplified_func_addr = original_to_simplified[target_addr] - original_loc_str = f"loc_{target_addr:x}" - jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" - jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str, - miasm_ctx.mdis.loc_db) - - # Unpin loc_key if it's pinned - original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) - if original_loc is not None: - miasm_ctx.loc_db.unset_location_offset(original_loc) - - # Relocate the newly created block and generate machine code - original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) - miasm_ctx.loc_db.set_location_offset(original_loc, target_addr) - new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg) + unmut_jmp_patch = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr) + unmut_jmp_patches.append(unmut_jmp_patch) - # Merge patches into the patch list - for patch in new_jmp_patches.items(): - unmut_jmp_patches.append(patch) + # Find the section containing the original function + text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs))) + assert text_section is not None # Apply patches text_section_base = pe_obj.imagebase + text_section.virtual_address @@ -281,34 +264,18 @@ def __rebuild_simplified_binary_in_place( new_content[offset:offset + len(data)] = data themida_section.content = memoryview(new_content) - # Find the section containing the original function - protected_function_addrs = func_addr_to_simplified_cfg.keys() - text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs))) - assert text_section is not None - # Redirect functions to their simplified versions + protected_function_addrs = func_addr_to_simplified_cfg.keys() unmut_jmp_patches: list[tuple[int, bytes]] = [] for target_addr in protected_function_addrs: # Generate a single-block AsmCFG with a JMP to the simplified version simplified_func_addr = original_to_simplified[target_addr] - original_loc_str = f"loc_{target_addr:x}" - jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" - jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str, - miasm_ctx.mdis.loc_db) - - # Unpin loc_key if it's pinned - original_loc = miasm_ctx.loc_db.get_offset_location(target_addr) - if original_loc is not None: - miasm_ctx.loc_db.unset_location_offset(original_loc) - - # Relocate the newly created block and generate machine code - original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) - miasm_ctx.loc_db.set_location_offset(original_loc, target_addr) - new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg) + unmut_jmp_patch = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr) + unmut_jmp_patches.append(unmut_jmp_patch) - # Merge patches into the patch list - for patch in new_jmp_patches.items(): - unmut_jmp_patches.append(patch) + # Find the section containing the original function + text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs))) + assert text_section is not None # Apply patches text_section_base = pe_obj.imagebase + text_section.virtual_address diff --git a/themida_unmutate/miasm_utils.py b/themida_unmutate/miasm_utils.py index cbd228b..3ee21ca 100644 --- a/themida_unmutate/miasm_utils.py +++ b/themida_unmutate/miasm_utils.py @@ -1,11 +1,11 @@ from dataclasses import dataclass -from typing import Self +from typing import Optional, Self import miasm.expression.expression as m2_expr - from miasm.analysis.binary import Container from miasm.analysis.machine import Machine -from miasm.core.asmblock import disasmEngine +from miasm.core import parse_asm +from miasm.core.asmblock import AsmCFG, disasmEngine, asm_resolve_final from miasm.core.interval import interval from miasm.core.locationdb import LocationDB from miasm.ir.ir import Lifter @@ -51,3 +51,30 @@ def expr_int_to_int(expr: m2_expr.ExprInt) -> int: result = expr.arg return int(result) + + +def generate_code_redirect_patch(miasm_ctx: MiasmContext, src_addr: int, dst_addr: int) -> tuple[int, bytes]: + """ + Generate a single-block AsmCFG with a JMP from `src_addr` to `dst_addr` and return the result patch. + """ + X86_JMP_INSTRUCTION = "JMP" + + # Generate a single-block AsmCFG with a JMP to the simplified version + original_loc_str = f"loc_{src_addr:x}" + jmp_unmut_instr_str = f"{original_loc_str}:\n{X86_JMP_INSTRUCTION} 0x{dst_addr:x}" + jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str, + miasm_ctx.mdis.loc_db) + + # Unpin loc_key if it's pinned + original_loc = miasm_ctx.loc_db.get_offset_location(src_addr) + if original_loc is not None: + miasm_ctx.loc_db.unset_location_offset(original_loc) + + # Relocate the newly created block and generate machine code + original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str) + miasm_ctx.loc_db.set_location_offset(original_loc, src_addr) + jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg) + jmp_patch: Optional[tuple[int, bytes]] = next(iter(jmp_patches.items()), None) + assert jmp_patch is not None + + return jmp_patch diff --git a/themida_unmutate/unwrapping.py b/themida_unmutate/unwrapping.py index 0c8180e..f4659e4 100644 --- a/themida_unmutate/unwrapping.py +++ b/themida_unmutate/unwrapping.py @@ -1,12 +1,12 @@ import miasm.expression.expression as m2_expr from miasm.ir.ir import IRCFG, Lifter from miasm.ir.symbexec import SymbolicExecutionEngine -from themida_unmutate.logging import LOGGER +from themida_unmutate.logging import LOGGER from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int -def unwrap_function(miasm_ctx: MiasmContext, target_addr: int) -> int: +def resolve_mutated_code_address(miasm_ctx: MiasmContext, target_addr: int) -> int: # Save `follow_call` value and set it to `True` saved_follow_call = miasm_ctx.mdis.follow_call miasm_ctx.mdis.follow_call = True @@ -14,7 +14,6 @@ def unwrap_function(miasm_ctx: MiasmContext, target_addr: int) -> int: asmcfg = miasm_ctx.mdis.dis_multiblock(target_addr) # Restore `follow_call` value miasm_ctx.mdis.follow_call = saved_follow_call - # Lift ASM to IR ircfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asmcfg)