From d30715086fc06b746b6ba9c436ec3b695d2e2aa3 Mon Sep 17 00:00:00 2001 From: ergrelet Date: Sun, 7 Jan 2024 20:17:14 +0100 Subject: [PATCH] Implement multiple function support --- themida_unmutate/symexec.py | 549 +++++++++++++++++++-------------- themida_unmutate/unwrapping.py | 5 +- 2 files changed, 315 insertions(+), 239 deletions(-) diff --git a/themida_unmutate/symexec.py b/themida_unmutate/symexec.py index 49661f9..b63fe93 100644 --- a/themida_unmutate/symexec.py +++ b/themida_unmutate/symexec.py @@ -1,6 +1,6 @@ import itertools import sys -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from typing import Optional, Union import lief @@ -40,219 +40,231 @@ def main() -> None: - parser = ArgumentParser("Automatic deobfuscation tool powered by Miasm") - parser.add_argument("target", help="Target binary") - parser.add_argument("addr", help="Target address") - parser.add_argument("--output", - "-o", - help="Output file path", - required=True) - parser.add_argument("--architecture", "-a", help="Force architecture") - args = parser.parse_args() - target_addr = int(args.addr, 0) + # Parse command-line arguments + args = parse_arguments() + target_addrs = list(map(lambda addr: int(addr, 0), args.addresses)) - # Resolve mutated code's addr - print("Resolving mutated code portion address...") - mutated_code_addr = unwrap_function(args.target, args.architecture, - target_addr) - if mutated_code_addr == target_addr: - print("Failure") - return - - print(f"Mutated code is at 0x{mutated_code_addr:x}") + # Resolve mutated functions' addresses + mutated_func_addrs: list[int] = unwrap_functions(args.protected_binary, + target_addrs) + # Setup disassembler and lifter loc_db = LocationDB() - with open(args.target, 'rb') as target_bin: + with open(args.protected_binary, 'rb') as target_bin: cont = Container.from_stream(target_bin, loc_db) - machine = Machine(args.architecture if args.architecture else cont.arch) + machine = Machine(cont.arch) assert machine.dis_engine is not None - - # Disassemble function mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) - asmcfg = mdis.dis_multiblock(mutated_code_addr) - # Lift assembly to IR - lifter = machine.lifter(loc_db) - ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) - - # Process IR basic blocks - for loc_key, ir_block in ircfg.blocks.items(): - print(f"{loc_key}:") - asm_block = asmcfg.loc_key_to_block(loc_key) - if asm_block is None: - # Some instructions such `idiv` generate multiple IR basic blocks from a single asm instruction, so we - # skip these - continue - - relevant_assignblks = ir_block.assignblks[:-1] - relevant_blk_count = len(relevant_assignblks) - # No relevant instruction - # -> unmutated, branching instruction -> keep as is - if relevant_blk_count == 0: - print(ir_block.assignblks[0].instr) - continue - - # Only 1 or 2 relevant instructions - # -> unmutated, no junk code -> no action needed -> keep first instruction as is - if relevant_blk_count <= 2: - print(ir_block.assignblks[0].instr) - relocatable_instr = fix_rip_relative_instruction( - asmcfg, ir_block.assignblks[0].instr) - # Note(ergrelet): reset the instruction's additional info to avoid - # certain assembling issues where instruction prefixes are mixed - # in a illegal way. - relocatable_instr.additional_info = x86_arch.additional_info() - - asm_block.lines[0] = relocatable_instr - continue - - reference_sb = SymbolicExecutionEngine(lifter) - for assign_block in relevant_assignblks: - reference_sb.eval_updt_assignblk(assign_block) - # Forget dead stack slots - reference_sb.del_mem_above_stack(lifter.sp) - - # Strip FLAGS register (as these are trashed by the mutation) - strip_sym_flags(reference_sb) - - # More than 2 instructions but a single instruction replicates the symbolic state - # -> unmutated, junk code inserted -> keep the one instruction as is - block_simplified = False - for assignblk_subset in itertools.combinations(relevant_assignblks, 1): - sb = SymbolicExecutionEngine(lifter) - - for assign_block in assignblk_subset: - sb.eval_updt_assignblk(assign_block) - reference_sb.del_mem_above_stack(lifter.sp) - - # Check if instruction replicates the symbolic state - if reference_sb.get_state() == sb.get_state(): - for a in assignblk_subset: - print(a.instr) - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, a.instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - block_simplified = True - break - if block_simplified: - continue - - # More than 2 instructions but no single instruction replicates the symbolic state - # -> mutated, junk code inserted -> try to "synthetize" instruction manually - modified_variables = dict(reference_sb.modified()) - match len(modified_variables): - # No assignment block: RET, JMP - case 0: - # Keep only the last instruction - print(asm_block.lines[-1]) - asm_block.lines = [asm_block.lines[-1]] + + # Iterate through functions, disassemble and simplify them + simplified_func_asmcfgs: list[AsmCFG] = [] + for mutated_code_addr in mutated_func_addrs: + print(f"Simplifying function at address 0x{mutated_code_addr:x}") + # Disassemble function + asmcfg = mdis.dis_multiblock(mutated_code_addr) + # Lift assembly to IR + lifter = machine.lifter(loc_db) + ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) + + # Process IR basic blocks + for loc_key, ir_block in ircfg.blocks.items(): + print(f"{loc_key}:") + asm_block = asmcfg.loc_key_to_block(loc_key) + if asm_block is None: + # Some instructions such `idiv` generate multiple IR basic blocks from a single asm instruction, so we + # skip these continue - # 1 assignment block: MOV, XCHG, n-ary operators - case 1: - ir_assignment = next(iter(modified_variables.items())) - dst, value = normalize_ir_assigment(ir_assignment) - match type(value): - case m2_expr.ExprId | m2_expr.ExprMem | m2_expr.ExprInt | m2_expr.ExprSlice: - # Assignation - # -> MOV - match type(dst): - case m2_expr.ExprId | m2_expr.ExprMem | m2_expr.ExprSlice: - original_instr = handle_mov(mdis, dst, value) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [ - relocatable_instr, asm_block.lines[-1] - ] - continue - - case m2_expr.ExprOp: - # N-ary operation on native-size registers - # -> ADD/SUB/INC/DEC/AND/OR/XOR/NEG/NOT/ROL/ROR/SAR/SHL/SHR - original_instr = handle_nary_op(mdis, dst, value) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [ - relocatable_instr, asm_block.lines[-1] - ] - continue - - case m2_expr.ExprCompose: - # MOV, XCHG on single register or n-ary operation on lower-sized registers - original_instr = handle_compose(mdis, dst, value) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [ - relocatable_instr, asm_block.lines[-1] - ] - continue - - # 2 assignment blocks - # -> PUSH, POP, XCHG, `SUB RSP, X` - case 2: - modified_variables_iter = iter(modified_variables.items()) - assignblk1 = next(modified_variables_iter) - assignblk2 = next(modified_variables_iter) - - # PUSH - original_instr = handle_push(mdis, assignblk1, assignblk2) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - continue - # POP - original_instr = handle_pop(mdis, assignblk1, assignblk2) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - continue - # XCHG - original_instr = handle_xchg(mdis, assignblk1, assignblk2) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - continue + relevant_assignblks = ir_block.assignblks[:-1] + relevant_blk_count = len(relevant_assignblks) + # No relevant instruction + # -> unmutated, branching instruction -> keep as is + if relevant_blk_count == 0: + print(ir_block.assignblks[0].instr) + continue - # `SUB RSP, X` - original_instr = handle_sub_rsp(mdis, modified_variables) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - continue + # Only 1 or 2 relevant instructions + # -> unmutated, no junk code -> no action needed -> keep first instruction as is + if relevant_blk_count <= 2: + print(ir_block.assignblks[0].instr) + relocatable_instr = fix_rip_relative_instruction( + asmcfg, ir_block.assignblks[0].instr) + # Note(ergrelet): reset the instruction's additional info to avoid + # certain assembling issues where instruction prefixes are mixed + # in a illegal way. + relocatable_instr.additional_info = x86_arch.additional_info() + + asm_block.lines[0] = relocatable_instr + continue - # More than 2 assignment blocks - # -> `SUB RSP, X` - case _: - original_instr = handle_sub_rsp(mdis, modified_variables) - if original_instr is not None: - # Update block asm block - relocatable_instr = fix_rip_relative_instruction( - asmcfg, original_instr) - asm_block.lines = [relocatable_instr, asm_block.lines[-1]] - continue + reference_sb = SymbolicExecutionEngine(lifter) + for assign_block in relevant_assignblks: + reference_sb.eval_updt_assignblk(assign_block) + # Forget dead stack slots + reference_sb.del_mem_above_stack(lifter.sp) + + # Strip FLAGS register (as these are trashed by the mutation) + strip_sym_flags(reference_sb) + + # More than 2 instructions but a single instruction replicates the symbolic state + # -> unmutated, junk code inserted -> keep the one instruction as is + block_simplified = False + for assignblk_subset in itertools.combinations( + relevant_assignblks, 1): + sb = SymbolicExecutionEngine(lifter) + + for assign_block in assignblk_subset: + sb.eval_updt_assignblk(assign_block) + reference_sb.del_mem_above_stack(lifter.sp) + + # Check if instruction replicates the symbolic state + if reference_sb.get_state() == sb.get_state(): + for a in assignblk_subset: + print(a.instr) + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, a.instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + block_simplified = True + break + if block_simplified: + continue - print(modified_variables) - print("FIXME: unsupported instruction (or unmutated block?). " - "Mutated block will be kept as is.") + # More than 2 instructions but no single instruction replicates the symbolic state + # -> mutated, junk code inserted -> try to "synthetize" instruction manually + modified_variables = dict(reference_sb.modified()) + match len(modified_variables): + # No assignment block: RET, JMP + case 0: + # Keep only the last instruction + print(asm_block.lines[-1]) + asm_block.lines = [asm_block.lines[-1]] + continue - # Create a patched copy of the target - pe_obj = lief.PE.parse(args.target) + # 1 assignment block: MOV, XCHG, n-ary operators + case 1: + ir_assignment = next(iter(modified_variables.items())) + dst, value = normalize_ir_assigment(ir_assignment) + match type(value): + case m2_expr.ExprId | m2_expr.ExprMem | m2_expr.ExprInt | m2_expr.ExprSlice: + # Assignation + # -> MOV + match type(dst): + case m2_expr.ExprId | m2_expr.ExprMem | m2_expr.ExprSlice: + original_instr = handle_mov( + mdis, dst, value) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, + asm_block.lines[-1] + ] + continue + + case m2_expr.ExprOp: + # N-ary operation on native-size registers + # -> ADD/SUB/INC/DEC/AND/OR/XOR/NEG/NOT/ROL/ROR/SAR/SHL/SHR + original_instr = handle_nary_op(mdis, dst, value) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + + case m2_expr.ExprCompose: + # MOV, XCHG on single register or n-ary operation on lower-sized registers + original_instr = handle_compose(mdis, dst, value) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + + # 2 assignment blocks + # -> PUSH, POP, XCHG, `SUB RSP, X` + case 2: + modified_variables_iter = iter(modified_variables.items()) + assignblk1 = next(modified_variables_iter) + assignblk2 = next(modified_variables_iter) + + # PUSH + original_instr = handle_push(mdis, assignblk1, assignblk2) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + # POP + original_instr = handle_pop(mdis, assignblk1, assignblk2) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + # XCHG + original_instr = handle_xchg(mdis, assignblk1, assignblk2) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + + # `SUB RSP, X` + original_instr = handle_sub_rsp(mdis, modified_variables) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + + # More than 2 assignment blocks + # -> `SUB RSP, X` + case _: + original_instr = handle_sub_rsp(mdis, modified_variables) + if original_instr is not None: + # Update block asm block + relocatable_instr = fix_rip_relative_instruction( + asmcfg, original_instr) + asm_block.lines = [ + relocatable_instr, asm_block.lines[-1] + ] + continue + + print(modified_variables) + print("FIXME: unsupported instruction (or unmutated block?). " + "Mutated block will be kept as is.") + + # Simplify CFG (by merging basic blocks when possible) + asmcfg = bbl_simplifier(asmcfg) + + simplified_func_asmcfgs.append(asmcfg) + + # Open the target binary with LIEF + pe_obj = lief.PE.parse(args.protected_binary) if pe_obj is None: - print(f"Failed to parse PE '{args.target}'") + print(f"Failed to parse PE '{args.protected_binary}'") sys.exit(-1) # Create a new code section @@ -264,57 +276,85 @@ def main() -> None: pe_obj.add_section(unmut_section) unmut_section = pe_obj.get_section(NEW_SECTION_NAME) - # Simplify CFG (by merging basic blocks when possible) - asmcfg = bbl_simplifier(asmcfg) - - # Unpin blocks to be able to relocate the whole CFG image_base = pe_obj.imagebase unmut_section_base = image_base + unmut_section.virtual_address - head = asmcfg.heads()[0] - for ir_block in asmcfg.blocks: - loc_db.unset_location_offset(ir_block.loc_key) - loc_db.set_location_offset(head, unmut_section_base) - - # Generate deobfuscated assembly code - unmut_section_patches = asm_resolve_final( - mdis.arch, - asmcfg, - dst_interval=interval([ - (unmut_section_base, - unmut_section_base + unmut_section.virtual_size) - ])) + # Reassemble simplified AsmCFGs + original_to_simplified: dict[int, int] = {} + next_min_offset_for_asm = 0 + unmut_section_patches: list[tuple[int, bytes]] = [] + for i, simplified_asmcfg in enumerate(simplified_func_asmcfgs): + # Unpin blocks to be able to relocate the whole CFG + head = simplified_asmcfg.heads()[0] + for ir_block in simplified_asmcfg.blocks: + loc_db.unset_location_offset(ir_block.loc_key) + + # Relocate the function's entry block + loc_db.set_location_offset( + head, unmut_section_base + next_min_offset_for_asm) + + # Generate the simplified machine code + new_section_patches = asm_resolve_final( + mdis.arch, + simplified_asmcfg, + dst_interval=interval([ + (unmut_section_base + next_min_offset_for_asm, + unmut_section_base + unmut_section.virtual_size - + next_min_offset_for_asm) + ])) + + # Merge patches into the patch list + for patch in new_section_patches.items(): + unmut_section_patches.append(patch) + + # Associate original addr to simplified addr + original_to_simplified[target_addrs[i]] = min( + new_section_patches.keys()) + next_min_offset_for_asm = max( + new_section_patches.keys()) - unmut_section_base + 15 # Overwrite the section's content - new_section_size = max( - map(lambda a: a - unmut_section_base, - unmut_section_patches.keys())) + 15 + new_section_size = next_min_offset_for_asm new_content = bytearray([0] * new_section_size) - for addr, data in unmut_section_patches.items(): + for addr, data in unmut_section_patches: offset = addr - unmut_section_base new_content[offset:offset + len(data)] = data unmut_section.content = memoryview(new_content) - # Redirect function to its simplified version - # TODO: use function address when multi-function support is added - umut_loc_str = f"loc_{target_addr:x}" - jmp_unmut_instr_str = f"{umut_loc_str}:\nJMP 0x{unmut_section_base:x}" - jmp_unmut_asmcfg = parse_asm.parse_txt(mdis.arch, mdis.attrib, - jmp_unmut_instr_str, mdis.loc_db) - # Set loc_key's offset - loc_db.set_location_offset(loc_db.get_name_location(umut_loc_str), - target_addr) - unmut_jmp_patches = asm_resolve_final(mdis.arch, jmp_unmut_asmcfg) - - # Find the section containing the virtual address we want to modify - target_rva = target_addr - image_base + # Find the section containing the virtual addresses we want to modify + target_rva = target_addrs[0] - image_base text_section = section_from_virtual_address(pe_obj, target_rva) assert text_section is not None + # Redirect functions to their simplified versions + unmut_jmp_patches: list[tuple[int, bytes]] = [] + for target_addr in target_addrs: + # Generate a single-block AsmCFG with a JMP to the simplified version + simplified_func_addr = original_to_simplified[target_addr] + original_loc_str = f"loc_{target_addr:x}" + jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}" + jmp_unmut_asmcfg = parse_asm.parse_txt(mdis.arch, mdis.attrib, + jmp_unmut_instr_str, + mdis.loc_db) + + # Unpin loc_key if it's pinned + original_loc = loc_db.get_offset_location(target_addr) + if original_loc is not None: + loc_db.unset_location_offset(original_loc) + + # Relocate the newly created block and generate machine code + original_loc = loc_db.get_name_location(original_loc_str) + loc_db.set_location_offset(original_loc, target_addr) + new_jmp_patches = asm_resolve_final(mdis.arch, jmp_unmut_asmcfg) + + # Merge patches into the patch list + for patch in new_jmp_patches.items(): + unmut_jmp_patches.append(patch) + # Apply patches text_section_base = image_base + text_section.virtual_address text_section_bytes = bytearray(text_section.content) - for addr, data in unmut_jmp_patches.items(): + for addr, data in unmut_jmp_patches: offset = addr - text_section_base text_section_bytes[offset:offset + len(data)] = data text_section.content = memoryview(text_section_bytes) @@ -326,6 +366,43 @@ def main() -> None: builder.write(args.output) +def parse_arguments() -> Namespace: + """ + Parse command-line arguments. + """ + parser = ArgumentParser("Automatic deobfuscation tool powered by Miasm") + parser.add_argument("protected_binary", help="Target binary") + parser.add_argument("-a", + "--addresses", + nargs='+', + help="Addresses of the functions to deobfuscate", + required=True) + parser.add_argument("-o", + "--output", + help="Output file path", + required=True) + + return parser.parse_args() + + +def unwrap_functions(target_binary_path: str, + target_function_addrs: list[int]) -> list[int]: + """ + Resolve mutated function's addresses from original function addresses. + """ + mutated_func_addrs: list[int] = [] + for addr in target_function_addrs: + print(f"Resolving mutated code portion address for 0x{addr:x}...") + mutated_code_addr = unwrap_function(target_binary_path, addr) + if mutated_code_addr == addr: + raise Exception("Failure to unwrap function") + + print(f"Mutated code is at 0x{mutated_code_addr:x}") + mutated_func_addrs.append(mutated_code_addr) + + return mutated_func_addrs + + def strip_sym_flags(symex: SymbolicExecutionEngine) -> None: symex.apply_change(m2_expr.ExprId("zf", 1), m2_expr.ExprId("zf", 1)) symex.apply_change(m2_expr.ExprId("nf", 1), m2_expr.ExprId("nf", 1)) diff --git a/themida_unmutate/unwrapping.py b/themida_unmutate/unwrapping.py index c6d2162..8acdb34 100644 --- a/themida_unmutate/unwrapping.py +++ b/themida_unmutate/unwrapping.py @@ -8,12 +8,11 @@ from .miasm_utils import expr_int_to_int -def unwrap_function(target_bin_path: str, target_arch: str, - target_addr: int) -> int: +def unwrap_function(target_bin_path: str, target_addr: int) -> int: loc_db = LocationDB() with open(target_bin_path, 'rb') as target_bin: cont = Container.from_stream(target_bin, loc_db) - machine = Machine(target_arch if target_arch else cont.arch) + machine = Machine(cont.arch) assert machine.dis_engine is not None # Disassemble