Skip to content

Commit

Permalink
Implement in-place binary rewriting
Browse files Browse the repository at this point in the history
Add a CLI option to enable in-place rewriting
  • Loading branch information
ergrelet committed Apr 6, 2024
1 parent 5970e69 commit 51ccb37
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 29 deletions.
183 changes: 167 additions & 16 deletions themida_unmutate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import lief
from miasm.core import parse_asm
from miasm.core.asmblock import AsmCFG, asm_resolve_final
from miasm.core.asmblock import AsmCFG, asm_resolve_final, bbl_simplifier
from miasm.core.interval import interval

from themida_unmutate.logging import setup_logger, LOGGER
from themida_unmutate.miasm_utils import MiasmContext
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval
from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions
from themida_unmutate.unwrapping import unwrap_function

Expand Down Expand Up @@ -49,7 +49,8 @@ def entry_point() -> None:
# Rewrite the protected binary with simplified functions
LOGGER.info("Rebuilding binary file...")
rebuild_simplified_binary(miasm_ctx, func_addr_to_simplified_cfg,
args.protected_binary, args.output)
args.protected_binary, args.output,
args.reassemble_in_place)

LOGGER.info("Done! You can find your deobfuscated binary at '%s'" %
args.output)
Expand All @@ -75,6 +76,10 @@ def parse_arguments() -> Namespace:
parser.add_argument("--no-trampoline",
action='store_true',
help="Disable function unwrapping")
parser.add_argument("--reassemble-in-place",
action='store_true',
help="Rewrite simplified code over the mutated code"
"rather than in a new code section")
parser.add_argument("-v",
"--verbose",
action='store_true',
Expand Down Expand Up @@ -105,9 +110,11 @@ def unwrap_functions(target_binary_path: str,

def rebuild_simplified_binary(
miasm_ctx: MiasmContext,
func_addr_to_simplified_cfg: dict[int, AsmCFG],
func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG,
MiasmFunctionInterval]],
input_binary_path: str,
output_binary_path: str,
reassemble_in_place: bool,
) -> None:
"""
Reassemble functions' `AsmCFG` and rewrite the input binary with simplified
Expand All @@ -116,6 +123,28 @@ def rebuild_simplified_binary(
if len(func_addr_to_simplified_cfg) == 0:
raise ValueError("`protected_function_addrs` cannot be empty")

if reassemble_in_place:
__rebuild_simplified_binary_in_place(miasm_ctx,
func_addr_to_simplified_cfg,
input_binary_path,
output_binary_path)
else:
__rebuild_simplified_binary_in_new_section(
miasm_ctx, func_addr_to_simplified_cfg, input_binary_path,
output_binary_path)


def __rebuild_simplified_binary_in_new_section(
miasm_ctx: MiasmContext,
func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG,
MiasmFunctionInterval]],
input_binary_path: str,
output_binary_path: str,
) -> None:
"""
Reassemble functions' `AsmCFG` and rewrite the input binary with simplified
machine code in a new code section.
"""
# Open the target binary with LIEF
pe_obj = lief.PE.parse(input_binary_path)
if pe_obj is None:
Expand All @@ -129,16 +158,18 @@ def rebuild_simplified_binary(
| lief.PE.SECTION_CHARACTERISTICS.MEM_EXECUTE.value)
pe_obj.add_section(unmut_section)
unmut_section = pe_obj.get_section(NEW_SECTION_NAME)

image_base = pe_obj.imagebase
unmut_section_base = image_base + unmut_section.virtual_address
unmut_section_base = pe_obj.imagebase + unmut_section.virtual_address

# Reassemble simplified AsmCFGs
original_to_simplified: dict[int, int] = {}
next_min_offset_for_asm = 0
unmut_section_patches: list[tuple[int, bytes]] = []
for protected_func_addr, simplified_asmcfg in \
for protected_func_addr, val in \
func_addr_to_simplified_cfg.items():
simplified_asmcfg, _ = val
# Simplify CFG further (by merging basic blocks when possible)
simplified_asmcfg = bbl_simplifier(simplified_asmcfg)

# Unpin blocks to be able to relocate the whole CFG
head = simplified_asmcfg.heads()[0]
for ir_block in simplified_asmcfg.blocks:
Expand Down Expand Up @@ -168,18 +199,132 @@ def rebuild_simplified_binary(
next_min_offset_for_asm = max(
new_section_patches.keys()) - unmut_section_base + 15

# Overwrite the section's content
# Overwrite the new section's content
new_section_size = next_min_offset_for_asm
new_content = bytearray([0] * new_section_size)
for addr, data in unmut_section_patches:
offset = addr - unmut_section_base
new_content[offset:offset + len(data)] = data
unmut_section.content = memoryview(new_content)

# Find the section containing the virtual addresses we want to modify
# Find the section containing the original function
protected_function_addrs = func_addr_to_simplified_cfg.keys()
text_section = __section_from_virtual_address(
pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Redirect functions to their simplified versions
unmut_jmp_patches: list[tuple[int, bytes]] = []
for target_addr in protected_function_addrs:
# Generate a single-block AsmCFG with a JMP to the simplified version
simplified_func_addr = original_to_simplified[target_addr]
original_loc_str = f"loc_{target_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch,
miasm_ctx.mdis.attrib,
jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, target_addr)
new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch,
jmp_unmut_asmcfg)

# Merge patches into the patch list
for patch in new_jmp_patches.items():
unmut_jmp_patches.append(patch)

# Apply patches
text_section_base = pe_obj.imagebase + text_section.virtual_address
text_section_bytes = bytearray(text_section.content)
for addr, data in unmut_jmp_patches:
offset = addr - text_section_base
text_section_bytes[offset:offset + len(data)] = data
text_section.content = memoryview(text_section_bytes)

# Invoke the builder
builder = lief.PE.Builder(pe_obj)
builder.build()

# Save the result
builder.write(output_binary_path)


def __rebuild_simplified_binary_in_place(
miasm_ctx: MiasmContext,
func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG,
MiasmFunctionInterval]],
input_binary_path: str,
output_binary_path: str,
) -> None:
"""
Reassemble functions' `AsmCFG` and rewrite the input binary with simplified
machine code by overwriting the mutated code.
"""
# Open the target binary with LIEF
pe_obj = lief.PE.parse(input_binary_path)
if pe_obj is None:
raise Exception(f"Failed to parse PE '{input_binary_path}'")

# Reassemble simplified AsmCFGs
original_to_simplified: dict[int, int] = {}
unmut_patches: list[tuple[int, bytes]] = []
for protected_func_addr, val in \
func_addr_to_simplified_cfg.items():
simplified_asmcfg, orignal_asmcfg_interval = val

# Unpin blocks to be able to relocate the CFG
head = simplified_asmcfg.heads()[0]
for asm_block in simplified_asmcfg.blocks:
miasm_ctx.loc_db.unset_location_offset(asm_block.loc_key)

# Start rewriting at the first part of the interval (i.e., at the start
# of the mutated code)
target_addr: int = orignal_asmcfg_interval.intervals[0][0]
# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the function's entry block
miasm_ctx.loc_db.set_location_offset(head, target_addr)

# Generate the simplified machine code
new_section_patches = asm_resolve_final(
miasm_ctx.mdis.arch,
simplified_asmcfg,
dst_interval=orignal_asmcfg_interval)

# Merge patches into the patch list
for patch in new_section_patches.items():
unmut_patches.append(patch)

# Associate original addr to simplified addr
original_to_simplified[protected_func_addr] = min(
new_section_patches.keys())

# Find Themida's section
themida_section = __section_from_virtual_address(pe_obj, target_addr)
assert themida_section is not None

# Overwrite Themida's section content
themida_section_base = pe_obj.imagebase + themida_section.virtual_address
new_content = bytearray(themida_section.content)
for addr, data in unmut_patches:
offset = addr - themida_section_base
new_content[offset:offset + len(data)] = data
themida_section.content = memoryview(new_content)

# Find the section containing the original function
protected_function_addrs = func_addr_to_simplified_cfg.keys()
target_rva = next(iter(protected_function_addrs)) - image_base
text_section = section_from_virtual_address(pe_obj, target_rva)
text_section = __section_from_virtual_address(
pe_obj, next(iter(protected_function_addrs)))
assert text_section is not None

# Redirect functions to their simplified versions
Expand Down Expand Up @@ -210,7 +355,7 @@ def rebuild_simplified_binary(
unmut_jmp_patches.append(patch)

# Apply patches
text_section_base = image_base + text_section.virtual_address
text_section_base = pe_obj.imagebase + text_section.virtual_address
text_section_bytes = bytearray(text_section.content)
for addr, data in unmut_jmp_patches:
offset = addr - text_section_base
Expand All @@ -225,10 +370,16 @@ def rebuild_simplified_binary(
builder.write(output_binary_path)


def section_from_virtual_address(lief_bin: lief.Binary,
virtual_addr: int) -> Optional[lief.Section]:
def __section_from_virtual_address(
lief_bin: lief.Binary, virtual_addr: int) -> Optional[lief.Section]:
rva = virtual_addr - lief_bin.imagebase
return __section_from_rva(lief_bin, rva)


def __section_from_rva(lief_bin: lief.Binary,
rva: int) -> Optional[lief.Section]:
for s in lief_bin.sections:
if s.virtual_address <= virtual_addr < s.virtual_address + s.size:
if s.virtual_address <= rva < s.virtual_address + s.size:
assert isinstance(s, lief.Section)
return s

Expand Down
3 changes: 3 additions & 0 deletions themida_unmutate/miasm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core.asmblock import disasmEngine
from miasm.core.interval import interval
from miasm.core.locationdb import LocationDB
from miasm.ir.ir import Lifter

MiasmFunctionInterval = interval


@dataclass
class MiasmContext:
Expand Down
6 changes: 3 additions & 3 deletions themida_unmutate/symbolic_execution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from miasm.core.asmblock import AsmCFG

import themida_unmutate.symbolic_execution.x86 as symex_x86
from themida_unmutate.miasm_utils import MiasmContext
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval


def disassemble_and_simplify_functions(
miasm_ctx: MiasmContext,
mutated_func_addrs: list[int]) -> list[AsmCFG]:
miasm_ctx: MiasmContext, mutated_func_addrs: list[int]
) -> list[tuple[AsmCFG, MiasmFunctionInterval]]:
"""
Disassemble mutated functions, simplify their `AsmCFG` and return them.
"""
Expand Down
25 changes: 15 additions & 10 deletions themida_unmutate/symbolic_execution/x86/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

import miasm.arch.x86.arch as x86_arch
import miasm.expression.expression as m2_expr
from miasm.core.asmblock import AsmCFG, disasmEngine, bbl_simplifier
from miasm.core.asmblock import AsmCFG, disasmEngine
from miasm.core.cpu import instruction
from miasm.core.interval import interval
from miasm.ir.symbexec import SymbolicExecutionEngine
from themida_unmutate.logging import LOGGER

from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int
from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval, expr_int_to_int

AMD64_PTR_SIZE = 64
X86_BINARY_OPS_MAPPING = {
Expand All @@ -30,18 +31,23 @@


def disassemble_and_simplify_functions(
miasm_ctx: MiasmContext,
mutated_func_addrs: list[int]) -> list[AsmCFG]:
miasm_ctx: MiasmContext, mutated_func_addrs: list[int]
) -> list[tuple[AsmCFG, MiasmFunctionInterval]]:
"""
Disassemble mutated functions, simplify their `AsmCFG` and return them.
"""
# Iterate through functions, disassemble and simplify them
simplified_func_asmcfgs: list[AsmCFG] = []
simplified_func_asmcfgs: list[tuple[AsmCFG, MiasmFunctionInterval]] = []
for mutated_code_addr in mutated_func_addrs:
LOGGER.info("Simplifying function at 0x%x..." % mutated_code_addr)

# Disassemble function
asm_cfg = miasm_ctx.mdis.dis_multiblock(mutated_code_addr)
# Compute function's interval (this is needed when rewriting the binary
# in-place)
original_func_interval: MiasmFunctionInterval = interval(
blk.get_range() for blk in asm_cfg.blocks)

# Lift assembly to IR
ir_cfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asm_cfg)

Expand Down Expand Up @@ -70,8 +76,10 @@ def disassemble_and_simplify_functions(
asm_cfg, ir_block.assignblks[0].instr)
# Note(ergrelet): reset the instruction's additional info to avoid
# certain assembling issues where instruction prefixes are mixed
# in a illegal way.
# in an illegal way.
relocatable_instr.additional_info = x86_arch.additional_info()
relocatable_instr.additional_info.g1.value = 0 # type: ignore
relocatable_instr.additional_info.g2.value = 0 # type: ignore

asm_block.lines[0] = relocatable_instr
continue
Expand Down Expand Up @@ -242,10 +250,7 @@ def disassemble_and_simplify_functions(
LOGGER.warning("Unsupported instruction or unmutated block found. "
"Block will be kept as is.")

# Simplify CFG (by merging basic blocks when possible)
asm_cfg = bbl_simplifier(asm_cfg)

simplified_func_asmcfgs.append(asm_cfg)
simplified_func_asmcfgs.append((asm_cfg, original_func_interval))

return simplified_func_asmcfgs

Expand Down

0 comments on commit 51ccb37

Please sign in to comment.