Skip to content

Commit

Permalink
Implement a Binary Ninja plugin
Browse files Browse the repository at this point in the history
Make code more reusable in other contexts
  • Loading branch information
ergrelet committed May 13, 2024
1 parent 600b81e commit 989d1dd
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 18 deletions.
15 changes: 15 additions & 0 deletions binja_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from binaryninja import PluginCommand # type:ignore

from . import actions, plugin

plugin_commands = [
(
f"{plugin.NAME}\\Deobfuscate mutated code from this address",
"Deobfuscate mutated code from this address",
PluginCommand.register_for_address,
actions.deobfuscate_at_address,
),
]

for (command_name, command_description, command_registrator, command_action) in plugin_commands:
command_registrator(name=command_name, description=command_description, action=command_action)
64 changes: 64 additions & 0 deletions binja_plugin/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Self
from binaryninja import BinaryView # type:ignore
from binaryninja.log import Logger # type:ignore
from binaryninja.plugin import BackgroundTaskThread # type:ignore

from themida_unmutate.main import unwrap_functions
from themida_unmutate.symbolic_execution import disassemble_and_simplify_functions

from . import common, plugin

SUPPORTED_ARCHS = ["x86_64"]

logger = Logger(session_id=0, logger_name=plugin.NAME)


def deobfuscate_at_address(bv: BinaryView, address: int) -> None:
DeobfuscateCodeAtAddressTask(bv=bv, address=address).start()


class DeobfuscateCodeAtAddressTask(BackgroundTaskThread):

def __init__(self, bv: BinaryView, address: int):
super().__init__(
initial_progress_text=f"Deobfuscating code at 0x{address:x}",
can_cancel=False,
)
self.bv = bv
self.address = address

def run(self: Self) -> None:
if self.bv.arch is None:
logger.log_error("Could not get architecture of current binary view")
return

arch = str(self.bv.platform.arch)
if arch not in SUPPORTED_ARCHS:
logger.log_error("Current binary view's architecture isn't supported")
return
logger.log_info(f"Deobfuscating code at 0x{self.address:x}")

protected_func_addrs = [self.address]
binary_data = common.get_binary_data(self.bv)
miasm_ctx = common.create_miasm_context(arch, self.bv.original_base, binary_data)

logger.log_info("Resolving mutated's functions' addresses...")
mutated_func_addrs = unwrap_functions(miasm_ctx, protected_func_addrs)

# Disassemble mutated functions and simplify them
logger.log_info("Deobfuscating mutated functions...")
simplified_func_asmcfgs = disassemble_and_simplify_functions(miasm_ctx, mutated_func_addrs)

# Map protected functions' addresses to their corresponding simplified `AsmCFG`
func_addr_to_simplified_cfg = {
protected_func_addrs[i]: asm_cfg
for i, asm_cfg in enumerate(simplified_func_asmcfgs)
}

# Rewrite the protected binary with the simplified function
logger.log_info("Patching binary file...")
common.rebuild_simplified_binary(miasm_ctx, func_addr_to_simplified_cfg, self.bv)

# Relaunch analysis to take our changes into account
self.bv.update_analysis()
logger.log_info("Successfully simplified code at 0x{self.address:x}!")
110 changes: 110 additions & 0 deletions binja_plugin/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from binaryninja import BinaryView, BinaryReader, BinaryWriter # type:ignore

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core import parse_asm
from miasm.core.asmblock import AsmCFG, asm_resolve_final
from miasm.core.locationdb import LocationDB

from themida_unmutate.miasm_utils import MiasmContext, MiasmFunctionInterval


def get_binary_data(bv: BinaryView) -> bytearray:
"""
Retrieve binary data from `bv` as single `bytearray`.
Note: spaces between sections are replaced with 0s.
"""
# Sort sections by start address
sections = list(bv.sections.values())
sorted_section = sorted(sections, key=lambda s: s.start)

br = BinaryReader(bv)
last_section_address = bv.original_base
exe_data = bytearray()
for section in sorted_section:
# Pad with zeroes
padding_size = section.start - last_section_address
exe_data += b"\x00" * padding_size
exe_data += br.read(section.length, section.start)
last_section_address = section.start + section.length

return exe_data


def create_miasm_context(arch: str, binary_base_address: int, binary_data: bytearray) -> MiasmContext:
"""
Create `MiasmContext` from a `bytearray`, given the architecture and base address.
"""
loc_db = LocationDB()
machine = Machine(arch)
assert machine.dis_engine is not None
container = Container.from_string(binary_data, loc_db, addr=binary_base_address)
mdis = machine.dis_engine(container.bin_stream, loc_db=loc_db)
lifter = machine.lifter(loc_db)

return MiasmContext(loc_db, container, machine, mdis, lifter)


def rebuild_simplified_binary(
miasm_ctx: MiasmContext,
func_addr_to_simplified_cfg: dict[int, tuple[AsmCFG, MiasmFunctionInterval]],
bv: BinaryView,
) -> None:
bw = BinaryWriter(bv)

# Reassemble simplified AsmCFGs
original_to_simplified: dict[int, int] = {}
for protected_func_addr, val in func_addr_to_simplified_cfg.items():
simplified_asmcfg, orignal_asmcfg_interval = val

# Unpin blocks to be able to relocate the CFG
head = simplified_asmcfg.heads()[0]
for asm_block in simplified_asmcfg.blocks:
miasm_ctx.loc_db.unset_location_offset(asm_block.loc_key)

# Start rewriting at the first part of the interval (i.e., at the start
# of the mutated code)
target_addr: int = orignal_asmcfg_interval.intervals[0][0]
# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the function's entry block
miasm_ctx.loc_db.set_location_offset(head, target_addr)

# Generate the simplified machine code
new_section_patches = asm_resolve_final(miasm_ctx.mdis.arch,
simplified_asmcfg,
dst_interval=orignal_asmcfg_interval)

# Apply patches
for address, data in new_section_patches.items():
bw.write(bytes(data), address)

# Associate original addr to simplified addr
original_to_simplified[protected_func_addr] = min(new_section_patches.keys())

# Redirect functions to their simplified versions
protected_function_addrs = func_addr_to_simplified_cfg.keys()
for target_addr in protected_function_addrs:
# Generate a single-block AsmCFG with a JMP to the simplified version
simplified_func_addr = original_to_simplified[target_addr]
original_loc_str = f"loc_{target_addr:x}"
jmp_unmut_instr_str = f"{original_loc_str}:\nJMP 0x{simplified_func_addr:x}"
jmp_unmut_asmcfg = parse_asm.parse_txt(miasm_ctx.mdis.arch, miasm_ctx.mdis.attrib, jmp_unmut_instr_str,
miasm_ctx.mdis.loc_db)

# Unpin loc_key if it's pinned
original_loc = miasm_ctx.loc_db.get_offset_location(target_addr)
if original_loc is not None:
miasm_ctx.loc_db.unset_location_offset(original_loc)

# Relocate the newly created block and generate machine code
original_loc = miasm_ctx.loc_db.get_name_location(original_loc_str)
miasm_ctx.loc_db.set_location_offset(original_loc, target_addr)
new_jmp_patches = asm_resolve_final(miasm_ctx.mdis.arch, jmp_unmut_asmcfg)

# Apply patches
for address, data in new_jmp_patches.items():
bw.write(bytes(data), address)
1 change: 1 addition & 0 deletions binja_plugin/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NAME = "themida-unmutate-bn"
24 changes: 24 additions & 0 deletions plugin.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"pluginmetadataversion": 2,
"name": "themida-unmutate-bn",
"type": ["core"],
"api": ["python3"],
"description": "Static deobfuscator for Themida's mutation-based obfuscation.",
"longdescription": "",
"license": {
"name": "GPL-3.0-or-later",
"text": ""
},
"platforms": ["Darwin", "Linux", "Windows"],
"installinstructions": {
"Darwin": "",
"Linux": "",
"Windows": ""
},
"dependencies": {
"pip": ["miasm", "themida-unmutate"]
},
"version": "0.1.0",
"author": "Erwan Grelet",
"minimumbinaryninjaversion": 3164
}
8 changes: 4 additions & 4 deletions themida_unmutate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ def entry_point() -> None:
setup_logger(args.verbose)

# Setup disassembler and lifter
miasm_ctx = MiasmContext(args.protected_binary)
miasm_ctx = MiasmContext.from_binary_file(args.protected_binary)

# Resolve mutated functions' addresses if needed
protected_func_addrs = list(map(lambda addr: int(addr, 0), args.addresses))
if not args.no_trampoline:
LOGGER.info("Resolving mutated's functions' addresses...")
mutated_func_addrs = unwrap_functions(args.protected_binary, protected_func_addrs)
mutated_func_addrs = unwrap_functions(miasm_ctx, protected_func_addrs)
else:
# No trampolines to take care of, use target addresses directly
mutated_func_addrs = protected_func_addrs
Expand Down Expand Up @@ -70,14 +70,14 @@ def parse_arguments() -> Namespace:
return parser.parse_args()


def unwrap_functions(target_binary_path: str, target_function_addrs: list[int]) -> list[int]:
def unwrap_functions(miasm_ctx: MiasmContext, target_function_addrs: list[int]) -> list[int]:
"""
Resolve mutated function's addresses from original function addresses.
"""
mutated_func_addrs: list[int] = []
for addr in target_function_addrs:
LOGGER.debug("Resolving mutated code portion address for 0x%x..." % addr)
mutated_code_addr = unwrap_function(target_binary_path, addr)
mutated_code_addr = unwrap_function(miasm_ctx, addr)
if mutated_code_addr == addr:
raise Exception("Failure to unwrap function")

Expand Down
22 changes: 13 additions & 9 deletions themida_unmutate/miasm_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from typing import Self

import miasm.expression.expression as m2_expr

Expand All @@ -20,22 +21,25 @@ class MiasmContext:
mdis: disasmEngine
lifter: Lifter

def __init__(self, target_binary_path: str) -> None:
@classmethod
def from_binary_file(cls, target_binary_path: str) -> Self:
"""
Initialize our Miasm context, targeted at x86_64 binaries.
Initialize our Miasm context from a binary file.
"""
self.loc_db = LocationDB()
loc_db = LocationDB()
with open(target_binary_path, 'rb') as target_binary:
self.container = Container.from_stream(target_binary, self.loc_db)
self.machine = Machine(self.container.arch)
assert self.machine.dis_engine is not None
container = Container.from_stream(target_binary, loc_db)
machine = Machine(container.arch)
assert machine.dis_engine is not None

self.mdis = self.machine.dis_engine(self.container.bin_stream, loc_db=self.loc_db)
self.lifter = self.machine.lifter(self.loc_db)
mdis = machine.dis_engine(container.bin_stream, loc_db=loc_db)
lifter = machine.lifter(loc_db)

return cls(loc_db, container, machine, mdis, lifter)

@property
def arch(self) -> str:
return str(self.container.arch)
return str(self.machine.name)


def expr_int_to_int(expr: m2_expr.ExprInt) -> int:
Expand Down
11 changes: 6 additions & 5 deletions themida_unmutate/unwrapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from themida_unmutate.miasm_utils import MiasmContext, expr_int_to_int


def unwrap_function(target_bin_path: str, target_addr: int) -> int:
# Setup disassembler and lifter
miasm_ctx = MiasmContext(target_bin_path)

# Disassemble trampoline
def unwrap_function(miasm_ctx: MiasmContext, target_addr: int) -> int:
# Save `follow_call` value and set it to `True`
saved_follow_call = miasm_ctx.mdis.follow_call
miasm_ctx.mdis.follow_call = True
# Disassemble trampoline
asmcfg = miasm_ctx.mdis.dis_multiblock(target_addr)
# Restore `follow_call` value
miasm_ctx.mdis.follow_call = saved_follow_call

# Lift ASM to IR
ircfg = miasm_ctx.lifter.new_ircfg_from_asmcfg(asmcfg)
Expand Down

0 comments on commit 989d1dd

Please sign in to comment.