Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding analysis facility for Android NDK #556

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions trueseeing/core/android/analysis/nat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from functools import cache
import os
import re

if TYPE_CHECKING:
from typing import Iterable, Dict, Any, Iterator, Mapping, Tuple

def _analyzed(x: str, tlds: re.Pattern[str]) -> Iterable[Dict[str, Any]]:
if '://' in x:
yield dict(type_='URL', value=re.findall(r'\S+://\S+', x))
elif re.search(r'^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+', x):
yield dict(type_='path component', value=re.findall(r'^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+', x))
elif re.search(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?$', x):
m = re.search(r'^([^:/]+)', x)
if m:
hostlike = m.group(1)
components = hostlike.split('.')
if len(components) == 4 and all(re.match(r'^\d+$', c) for c in components) and all(int(c) < 256 for c in components):
yield dict(type_='possible IPv4 address', value=[hostlike])
elif tlds.search(components[-1]):
if not re.search(r'^android\.(intent|media)\.|^os\.name$|^java\.vm\.name|^[A-Z]+.*\.(java|EC|name|secure)$', hostlike):
yield dict(type_='possible FQDN', value=[hostlike])

@cache
def _pat(c: str) -> re.Pattern[str]:
from io import StringIO
f = StringIO(c)
return re.compile('^(?:{})$'.format('|'.join(re.escape(l.strip()) for l in f if l and not l.startswith('#'))), flags=re.IGNORECASE)

@cache
def _tlds() -> str:
from importlib.resources import files
with (files('trueseeing')/'libs'/'public_suffix_list.dat').open('r', encoding='utf-8') as f:
return f.read()

def analyze_url(path: str) -> Iterator[Mapping[str, Any]]:
def walker(path: str) -> Iterator[Tuple[str, bytes]]:
for dirpath, _, filenames in os.walk(path):
for fn in filenames:
target = os.path.join(dirpath, fn)
with open(target, 'rb') as f:
yield target, f.read()

return analyze_url_in(walker(path))

def analyze_url_in(gen: Iterable[Tuple[str, bytes]]) -> Iterator[Mapping[str, Any]]:
tlds = _pat(_tlds())

pats = rb'|'.join([
b"([a-z0-9A-Z]+://[^\\\"<>()' \\t\\n\\v\\r\\x00-\\x1f\\x80-\\xff]+)",
rb'"/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+',
rb'"[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?"',
])

seen = set()
for n, s in gen:
for m in re.finditer(pats, s):
urllike = m.group(0).decode('latin1').strip('"')
if urllike not in seen:
seen.add(urllike)
for d in _analyzed(urllike, tlds):
for v in d['value']:
yield dict(fn=n, typ=d['type_'], v=v)

def analyze_api(path: str) -> Iterator[Mapping[str, Any]]:
def walker(path: str) -> Iterator[Tuple[str, bytes]]:
for dirpath, _, filenames in os.walk(path):
for fn in filenames:
target = os.path.join(dirpath, fn)
with open(target, 'rb') as f:
yield target, f.read()

return analyze_api_in(walker(path))

def analyze_api_in(gen: Iterable[Tuple[str, bytes]]) -> Iterator[Mapping[str, Any]]:
# XXX: focusing on oneline
pats = rb'^([_.].*?:[0-9a-f]{8}) +?[0-9a-f]+ +?b[a-z]* +(.*?);([A-Za-z]+ .*)$'
blacklist = '|'.join([
r' (thunk_)?FUN_[0-9a-f]+',
r' __cxa_',
r' __stack_chk_fail',
r' operator\.',
r' ~',
])

for fn, s in gen:
for m in re.finditer(pats, s, flags=re.MULTILINE):
origin = m.group(1).decode('latin1').strip('" ').replace(':','+')
target = m.group(2).decode('latin1').strip('" ')
call = m.group(3).decode('latin1').strip('" ')
if not re.search(blacklist, call):
if re.search(r'operator|::.*?::', call):
lang = 'cpp'
else:
lang = 'c'

if 'EXTERNAL' in target:
yield dict(fn=fn, origin=origin, typ='API', lang=lang, call=call)
else:
yield dict(fn=fn, origin=origin, typ='private', lang=lang, call=call)

def get_origin(n: str, l: bytes) -> Mapping[str, Any]:
pat = rb'(_.*?:[0-9a-f]{8}) +?[0-9a-f]+? +[a-z]+ '
m = re.match(pat, l)
if m:
origin = m.group(1).decode('latin1').strip('"')
sect, offs = origin.split(':')
return dict(fn=n, sect=sect, offs=int(offs, 16))
else:
raise ValueError()
53 changes: 51 additions & 2 deletions trueseeing/core/android/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from trueseeing.core.context import Context

if TYPE_CHECKING:
from typing import List, Any, Iterable, Tuple, Optional, ClassVar, Set, AsyncIterator
from typing import List, Any, Iterable, Tuple, Optional, ClassVar, Set, AsyncIterator, Iterator, Mapping
from trueseeing.core.context import ContextType, ContextInfo
from trueseeing.core.android.asm import APKDisassembler
from trueseeing.core.android.store import APKStore
from trueseeing.core.android.model import XAPKManifest
from trueseeing.core.android.model import XAPKManifest, Call

class PackageNameReader:
@cache
Expand Down Expand Up @@ -117,6 +117,16 @@ async def _get_disassembler(self) -> APKDisassembler:
return APKDisassembler(self)

async def _analyze(self, level: int) -> None:
from time import time

at = time()

await self._analyze_dalvik(level)
await self._analyze_native(level)

pub.sendMessage('progress.core.analysis.done', t=time()-at)

async def _analyze_dalvik(self, level: int) -> None:
pub.sendMessage('progress.core.context.disasm.begin')
disasm = await self._get_disassembler()
await disasm.disassemble(level)
Expand Down Expand Up @@ -228,6 +238,45 @@ def _addr_ceil(nr: int) -> int:
c.execute('analyze')
pub.sendMessage('progress.core.analysis.smali.done', t=time.time() - started)

async def _analyze_native(self, level: int) -> None:
if level > 2:
tarpath = os.path.join(os.path.dirname(self._path), 'disasm.tar.gz')
if not os.path.exists(tarpath):
ui.warn(f'skipping native code analysis; prepare {tarpath}')
return

from time import time
at = time()

with self.store().query().scoped() as q:
pub.sendMessage('progress.core.analysis.nat.begin')
import tarfile
with tarfile.open(tarpath) as tf:
q.file_put_batch(dict(path=i.name, blob=tf.extractfile(i).read(), z=True) for i in tf.getmembers() if (i.isreg() or i.islnk())) # type:ignore[union-attr]

if level > 3:
pub.sendMessage('progress.core.analysis.nat.analyzing')
from trueseeing.core.android.analysis.nat import analyze_api_in

def _as_call(g: Iterator[Mapping[str, Any]]) -> Iterator[Call]:
for e in g:
typ = e['typ']
lang = e['lang']
sect, offs = e['origin'].split('+')
yield dict(
path=e['fn'],
sect=sect,
offs=int(offs.strip(), 16),
priv=(typ == 'private'),
cpp=(lang == 'cpp'),
target=e['call']
)

q.call_add_batch(_as_call(analyze_api_in(q.file_enum('lib/%'))))
pub.sendMessage('progress.core.analysis.nat.summary', calls=q.call_count())

pub.sendMessage('progress.core.analysis.nat.done', t=time() - at)

def get_package_name(self) -> str:
return self._package_reader.read(self.target)

Expand Down
25 changes: 24 additions & 1 deletion trueseeing/core/android/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if TYPE_CHECKING:
from typing import Optional, Iterator
from trueseeing.core.store import Store
from trueseeing.core.android.model import InvocationPattern
from trueseeing.core.android.model import InvocationPattern, Call

class APKStorePrep(StorePrep):
def stage1(self) -> None:
Expand Down Expand Up @@ -134,3 +134,26 @@ def body(self, class_name: str, method_name: Optional[str]) -> Iterator[Op]:
stmt1 = 'select addr, l from ops join map on (addr between low and high) where method=:method_name and class=:class_name'
for addr, l in self.db.execute(stmt1 if method_name else stmt0, dict(class_name=class_name, method_name=method_name)):
yield Op(addr, l)

def call_add_batch(self, gen: Iterator[Call]) -> None:
stmt0 = 'insert into ncalls (priv, cpp, target, path, sect, offs) values (:priv, :cpp, :target, :path, :sect, :offs)'
self.db.executemany(stmt0, gen)

def call_count(self) -> int:
stmt0 = 'select count(1) from ncalls'
for n, in self.db.execute(stmt0):
return n # type:ignore[no-any-return]
return 0

def calls(self, priv: bool = False, api: bool = False) -> Iterator[Call]:
stmt0 = 'select priv, cpp, target, path, sect, offs from ncalls'
stmt1 = 'select priv, cpp, target, path, sect, offs from ncalls where priv=:is_priv'
for priv, cpp, target, path, sect, offs in self.db.execute(stmt1 if (priv or api) else stmt0, dict(is_priv=priv)):
yield dict(
path=path,
sect=sect,
offs=offs,
priv=priv,
cpp=cpp,
target=target,
)
8 changes: 8 additions & 0 deletions trueseeing/core/android/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class XAPKManifest(TypedDict, total=False):
target_sdk_version: str
permissions: List[str]

class Call(TypedDict):
path: str
sect: str
offs: int
priv: bool
cpp: bool
target: str

class Op(NamedTuple):
addr: int
l: str
Expand Down
12 changes: 6 additions & 6 deletions trueseeing/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ def file_get_xml(self, path: str, default: Any = None, patched: bool = False) ->
else:
return default

def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = False) -> Iterable[Tuple[str, bytes]]:
def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = False, neg: bool = False) -> Iterable[Tuple[str, bytes]]:
if pat is not None:
stmt0 = 'select path, z, blob from files where path {op} :pat'.format(op=('like' if not regex else 'regexp'))
stmt1 = 'select path, coalesce(B.z, A.z) as z, coalesce(B.blob, A.blob) as blob from files as A full outer join patches as B using (path) where path {op} :pat'.format(op=('like' if not regex else 'regexp'))
stmt0 = 'select path, z, blob from files where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp'))
stmt1 = 'select path, coalesce(B.z, A.z) as z, coalesce(B.blob, A.blob) as blob from files as A full outer join patches as B using (path) where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp'))
for n, z, o in self.db.execute(stmt1 if patched else stmt0, dict(pat=pat)):
yield n, zd(o) if z else o
else:
Expand All @@ -91,10 +91,10 @@ def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = Fal
for n, z, o in self.db.execute(stmt3 if patched else stmt2):
yield n, zd(o) if z else o

def file_count(self, pat: Optional[str], patched: bool = False, regex: bool = False) -> int:
def file_count(self, pat: Optional[str], patched: bool = False, regex: bool = False, neg: bool = False) -> int:
if pat is not None:
stmt0 = 'select count(1) from files where path {op} :pat'.format(op=('like' if not regex else 'regexp'))
stmt1 = 'select conut(1) from files as A full outer join patches as B using (path) where path {op} :pat'.format(op=('like' if not regex else 'regexp'))
stmt0 = 'select count(1) from files where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp'))
stmt1 = 'select conut(1) from files as A full outer join patches as B using (path) where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp'))
for nr, in self.db.execute(stmt1 if patched else stmt0, dict(pat=pat)):
return nr # type:ignore[no-any-return]
else:
Expand Down
2 changes: 1 addition & 1 deletion trueseeing/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ def get_device_frida_dir(package_name: str) -> str:

@cache
def get_cache_schema_id() -> int:
return 0x190b4df6 # FIXME: remember to randomize this whenever incompatible changes occur on cache file structure, or DB schema
return 0x54f6d672 # FIXME: remember to randomize this whenever incompatible changes occur on cache file structure, or DB schema
20 changes: 20 additions & 0 deletions trueseeing/core/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ def scoped(self) -> Iterator[None]:
'progress.core.analysis.smali.summary':self._core_analysis_smali_summary,
'progress.core.analysis.smali.finalizing':self._core_analysis_smali_finalizing,
'progress.core.analysis.smali.done':self._core_analysis_smali_done,
'progress.core.analysis.nat.begin':self._core_analysis_nat_begin,
'progress.core.analysis.nat.analyzing':self._core_analysis_nat_analyzing,
'progress.core.analysis.nat.summary':self._core_analysis_nat_summary,
'progress.core.analysis.nat.done':self._core_analysis_nat_done,
'progress.core.analysis.done':self._core_analysis_done,
}
try:
for k, v in submap.items():
Expand Down Expand Up @@ -332,6 +337,21 @@ def _core_analysis_smali_finalizing(self) -> None:
def _core_analysis_smali_done(self, t: float) -> None:
ui.info(f"analyze: done ({t:.02f} sec)")

def _core_analysis_nat_begin(self) -> None:
ui.info('analyze_nat: analyzing...', nl=False)

def _core_analysis_nat_analyzing(self) -> None:
ui.info('analyze_nat: calls ...{tail}'.format(tail=' '*20), ow=True, nl=False)

def _core_analysis_nat_summary(self, calls: Optional[int] = None) -> None:
ui.info(f'analyze_nat: got {calls} calls', ow=True)

def _core_analysis_nat_done(self, t: float) -> None:
ui.info(f"analyze_nat: done ({t:.02f} sec)")

def _core_analysis_done(self, t: float) -> None:
ui.info(f"analyze: done ({t:.02f} sec)")

class FileTransferProgressReporter:
_bar: Optional[ProgressBar]
def __init__(self, desc: str) -> None:
Expand Down
1 change: 1 addition & 0 deletions trueseeing/libs/android/store.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ create table xref_const (addr integer primary key, insn varchar not null, sym va
create table xref_invoke (addr integer primary key, target integer, insn varchar not null, sym varchar not null);
create table xref_sput (addr integer primary key, insn varchar not null, sym varchar not null);
create table xref_iput (addr integer primary key, insn varchar not null, sym varchar not null);
create table ncalls (nr integer primary key, priv boolean not null, cpp boolean not null, target varchar not null, path varchar not null, sect varchar not null, offs integer not null);
create index idx_map_class on map (class);
create index idx_xref_invoke_target on xref_invoke (target);
Loading
Loading