Skip to content

Commit

Permalink
Reduce code duplication a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
ergrelet committed May 13, 2024
1 parent c47aac7 commit e1e2c6d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 77 deletions.
27 changes: 4 additions & 23 deletions binja_plugin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core import parse_asm
from miasm.core.asmblock import AsmCFG, asm_resolve_final
from miasm.core.locationdb import LocationDB
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, generate_code_redirect_patch


def get_binary_data(bv: BinaryView) -> bytearray:
Expand Down Expand Up @@ -88,25 +87,7 @@ def rebuild_simplified_binary(
original_to_simplified[protected_func_addr] = min(new_section_patches.keys())

# Redirect functions to their simplified versions
protected_function_addrs = func_addr_to_simplified_cfg.keys()
for target_addr in protected_function_addrs:
# Generate a single-block AsmCFG with a JMP to the simplified version
for target_addr in func_addr_to_simplified_cfg.keys():
simplified_func_addr = original_to_simplified[target_addr]
original_loc_str = f"loc_{target_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, target_addr)
new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg)

# Apply patches
for address, data in new_jmp_patches.items():
bw.write(bytes(data), address)
address, data = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr)
bw.write(data, address)
63 changes: 15 additions & 48 deletions themida_unmutate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
from typing import Optional

import lief
from miasm.core import parse_asm
from miasm.core.asmblock import AsmCFG, asm_resolve_final, bbl_simplifier
from miasm.core.interval import interval

from themida_unmutate.logging import setup_logger, LOGGER
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, generate_code_redirect_patch
from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions
from themida_unmutate.unwrapping import unwrap_function
from themida_unmutate.unwrapping import resolve_mutated_code_address

NEW_SECTION_NAME = ".unmut"
NEW_SECTION_MAX_SIZE = 2**16
Expand Down Expand Up @@ -77,7 +76,7 @@ def unwrap_functions(miasm_ctx: MiasmContext, target_function_addrs: list[int])
mutated_func_addrs: list[int] = []
for addr in target_function_addrs:
LOGGER.debug("Resolving mutated code portion address for 0x%x..." % addr)
mutated_code_addr = unwrap_function(miasm_ctx, addr)
mutated_code_addr = resolve_mutated_code_address(miasm_ctx, addr)
if mutated_code_addr == addr:
raise Exception("Failure to unwrap function")

Expand Down Expand Up @@ -174,34 +173,18 @@ def __rebuild_simplified_binary_in_new_section(
new_content[offset:offset + len(data)] = data
unmut_section.content = memoryview(new_content)

# Find the section containing the original function
protected_function_addrs = func_addr_to_simplified_cfg.keys()
text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Redirect functions to their simplified versions
protected_function_addrs = func_addr_to_simplified_cfg.keys()
unmut_jmp_patches: list[tuple[int, bytes]] = []
for target_addr in protected_function_addrs:
# Generate a single-block AsmCFG with a JMP to the simplified version
simplified_func_addr = original_to_simplified[target_addr]
original_loc_str = f"loc_{target_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, target_addr)
new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg)
unmut_jmp_patch = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr)
unmut_jmp_patches.append(unmut_jmp_patch)

# Merge patches into the patch list
for patch in new_jmp_patches.items():
unmut_jmp_patches.append(patch)
# Find the section containing the original function
text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Apply patches
text_section_base = pe_obj.imagebase + text_section.virtual_address
Expand Down Expand Up @@ -281,34 +264,18 @@ def __rebuild_simplified_binary_in_place(
new_content[offset:offset + len(data)] = data
themida_section.content = memoryview(new_content)

# Find the section containing the original function
protected_function_addrs = func_addr_to_simplified_cfg.keys()
text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Redirect functions to their simplified versions
protected_function_addrs = func_addr_to_simplified_cfg.keys()
unmut_jmp_patches: list[tuple[int, bytes]] = []
for target_addr in protected_function_addrs:
# Generate a single-block AsmCFG with a JMP to the simplified version
simplified_func_addr = original_to_simplified[target_addr]
original_loc_str = f"loc_{target_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, target_addr)
new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg)
unmut_jmp_patch = generate_code_redirect_patch(miasm_ctx, target_addr, simplified_func_addr)
unmut_jmp_patches.append(unmut_jmp_patch)

# Merge patches into the patch list
for patch in new_jmp_patches.items():
unmut_jmp_patches.append(patch)
# Find the section containing the original function
text_section = __section_from_virtual_address(pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Apply patches
text_section_base = pe_obj.imagebase + text_section.virtual_address
Expand Down
33 changes: 30 additions & 3 deletions themida_unmutate/miasm_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dataclasses import dataclass
from typing import Self
from typing import Optional, Self

import miasm.expression.expression as m2_expr

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core.asmblock import disasmEngine
from miasm.core import parse_asm
from miasm.core.asmblock import AsmCFG, disasmEngine, asm_resolve_final
from miasm.core.interval import interval
from miasm.core.locationdb import LocationDB
from miasm.ir.ir import Lifter
Expand Down Expand Up @@ -51,3 +51,30 @@ def expr_int_to_int(expr: m2_expr.ExprInt) -> int:
result = expr.arg

return int(result)


def generate_code_redirect_patch(miasm_ctx: MiasmContext, src_addr: int, dst_addr: int) -> tuple[int, bytes]:
"""
Generate a single-block AsmCFG with a JMP from `src_addr` to `dst_addr` and return the result patch.
"""
X86_JMP_INSTRUCTION = "JMP"

# Generate a single-block AsmCFG with a JMP to the simplified version
original_loc_str = f"loc_{src_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\n{X86_JMP_INSTRUCTION} 0x{dst_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(src_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, src_addr)
jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg)
jmp_patch: Optional[tuple[int, bytes]] = next(iter(jmp_patches.items()), None)
assert jmp_patch is not None

return jmp_patch
5 changes: 2 additions & 3 deletions themida_unmutate/unwrapping.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import miasm.expression.expression as m2_expr
from miasm.ir.ir import IRCFG, Lifter
from miasm.ir.symbexec import SymbolicExecutionEngine
from themida_unmutate.logging import LOGGER

from themida_unmutate.logging import LOGGER
from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int


def unwrap_function(miasm_ctx: MiasmContext, target_addr: int) -> int:
def resolve_mutated_code_address(miasm_ctx: MiasmContext, target_addr: int) -> int:
# Save `follow_call` value and set it to `True`
saved_follow_call = miasm_ctx.mdis.follow_call
miasm_ctx.mdis.follow_call = True
# Disassemble trampoline
asmcfg = miasm_ctx.mdis.dis_multiblock(target_addr)
# Restore `follow_call` value
miasm_ctx.mdis.follow_call = saved_follow_call

# Lift ASM to IR
ircfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asmcfg)

Expand Down

0 comments on commit e1e2c6d

Please sign in to comment.