From f9d5d41eff3873d6e418e5fb710c5cc6d28f1ed5 Mon Sep 17 00:00:00 2001 From: Takahiro Yoshimura Date: Fri, 22 Nov 2024 21:06:18 +0900 Subject: [PATCH] Adding analysis facility for Android NDK --- trueseeing/core/android/analysis/nat.py | 112 ++++++++++++++++++++++++ trueseeing/core/android/context.py | 53 ++++++++++- trueseeing/core/android/db.py | 25 +++++- trueseeing/core/android/model.py | 8 ++ trueseeing/core/db.py | 12 +-- trueseeing/core/env.py | 2 +- trueseeing/core/ui.py | 20 +++++ trueseeing/libs/android/store.0.sql | 1 + trueseeing/sig/android/nat.py | 93 ++++++++++++++++++++ 9 files changed, 316 insertions(+), 10 deletions(-) create mode 100644 trueseeing/core/android/analysis/nat.py create mode 100644 trueseeing/sig/android/nat.py diff --git a/trueseeing/core/android/analysis/nat.py b/trueseeing/core/android/analysis/nat.py new file mode 100644 index 00000000..a5e2132c --- /dev/null +++ b/trueseeing/core/android/analysis/nat.py @@ -0,0 +1,112 @@ +from __future__ import annotations +from typing import TYPE_CHECKING +from functools import cache +import os +import re + +if TYPE_CHECKING: + from typing import Iterable, Dict, Any, Iterator, Mapping, Tuple + +def _analyzed(x: str, tlds: re.Pattern[str]) -> Iterable[Dict[str, Any]]: + if '://' in x: + yield dict(type_='URL', value=re.findall(r'\S+://\S+', x)) + elif re.search(r'^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+', x): + yield dict(type_='path component', value=re.findall(r'^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+', x)) + elif re.search(r'^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?$', x): + m = re.search(r'^([^:/]+)', x) + if m: + hostlike = m.group(1) + components = hostlike.split('.') + if len(components) == 4 and all(re.match(r'^\d+$', c) for c in components) and all(int(c) < 256 for c in components): + yield dict(type_='possible IPv4 address', value=[hostlike]) + elif tlds.search(components[-1]): + if not re.search(r'^android\.(intent|media)\.|^os\.name$|^java\.vm\.name|^[A-Z]+.*\.(java|EC|name|secure)$', hostlike): + yield dict(type_='possible FQDN', value=[hostlike]) + +@cache +def _pat(c: str) -> re.Pattern[str]: + from io import StringIO + f = StringIO(c) + return re.compile('^(?:{})$'.format('|'.join(re.escape(l.strip()) for l in f if l and not l.startswith('#'))), flags=re.IGNORECASE) + +@cache +def _tlds() -> str: + from importlib.resources import files + with (files('trueseeing')/'libs'/'public_suffix_list.dat').open('r', encoding='utf-8') as f: + return f.read() + +def analyze_url(path: str) -> Iterator[Mapping[str, Any]]: + def walker(path: str) -> Iterator[Tuple[str, bytes]]: + for dirpath, _, filenames in os.walk(path): + for fn in filenames: + target = os.path.join(dirpath, fn) + with open(target, 'rb') as f: + yield target, f.read() + + return analyze_url_in(walker(path)) + +def analyze_url_in(gen: Iterable[Tuple[str, bytes]]) -> Iterator[Mapping[str, Any]]: + tlds = _pat(_tlds()) + + pats = rb'|'.join([ + b"([a-z0-9A-Z]+://[^\\\"<>()' \\t\\n\\v\\r\\x00-\\x1f\\x80-\\xff]+)", + rb'"/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+', + rb'"[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?"', + ]) + + seen = set() + for n, s in gen: + for m in re.finditer(pats, s): + urllike = m.group(0).decode('latin1').strip('"') + if urllike not in seen: + seen.add(urllike) + for d in _analyzed(urllike, tlds): + for v in d['value']: + yield dict(fn=n, typ=d['type_'], v=v) + +def analyze_api(path: str) -> Iterator[Mapping[str, Any]]: + def walker(path: str) -> Iterator[Tuple[str, bytes]]: + for dirpath, _, filenames in os.walk(path): + for fn in filenames: + target = os.path.join(dirpath, fn) + with open(target, 'rb') as f: + yield target, f.read() + + return analyze_api_in(walker(path)) + +def analyze_api_in(gen: Iterable[Tuple[str, bytes]]) -> Iterator[Mapping[str, Any]]: + # XXX: focusing on oneline + pats = rb'^([_.].*?:[0-9a-f]{8}) +?[0-9a-f]+ +?b[a-z]* +(.*?);([A-Za-z]+ .*)$' + blacklist = '|'.join([ + r' (thunk_)?FUN_[0-9a-f]+', + r' __cxa_', + r' __stack_chk_fail', + r' operator\.', + r' ~', + ]) + + for fn, s in gen: + for m in re.finditer(pats, s, flags=re.MULTILINE): + origin = m.group(1).decode('latin1').strip('" ').replace(':','+') + target = m.group(2).decode('latin1').strip('" ') + call = m.group(3).decode('latin1').strip('" ') + if not re.search(blacklist, call): + if re.search(r'operator|::.*?::', call): + lang = 'cpp' + else: + lang = 'c' + + if 'EXTERNAL' in target: + yield dict(fn=fn, origin=origin, typ='API', lang=lang, call=call) + else: + yield dict(fn=fn, origin=origin, typ='private', lang=lang, call=call) + +def get_origin(n: str, l: bytes) -> Mapping[str, Any]: + pat = rb'(_.*?:[0-9a-f]{8}) +?[0-9a-f]+? +[a-z]+ ' + m = re.match(pat, l) + if m: + origin = m.group(1).decode('latin1').strip('"') + sect, offs = origin.split(':') + return dict(fn=n, sect=sect, offs=int(offs, 16)) + else: + raise ValueError() diff --git a/trueseeing/core/android/context.py b/trueseeing/core/android/context.py index 1c9020f1..1baad601 100644 --- a/trueseeing/core/android/context.py +++ b/trueseeing/core/android/context.py @@ -13,11 +13,11 @@ from trueseeing.core.context import Context if TYPE_CHECKING: - from typing import List, Any, Iterable, Tuple, Optional, ClassVar, Set, AsyncIterator + from typing import List, Any, Iterable, Tuple, Optional, ClassVar, Set, AsyncIterator, Iterator, Mapping from trueseeing.core.context import ContextType, ContextInfo from trueseeing.core.android.asm import APKDisassembler from trueseeing.core.android.store import APKStore - from trueseeing.core.android.model import XAPKManifest + from trueseeing.core.android.model import XAPKManifest, Call class PackageNameReader: @cache @@ -117,6 +117,16 @@ async def _get_disassembler(self) -> APKDisassembler: return APKDisassembler(self) async def _analyze(self, level: int) -> None: + from time import time + + at = time() + + await self._analyze_dalvik(level) + await self._analyze_native(level) + + pub.sendMessage('progress.core.analysis.done', t=time()-at) + + async def _analyze_dalvik(self, level: int) -> None: pub.sendMessage('progress.core.context.disasm.begin') disasm = await self._get_disassembler() await disasm.disassemble(level) @@ -228,6 +238,45 @@ def _addr_ceil(nr: int) -> int: c.execute('analyze') pub.sendMessage('progress.core.analysis.smali.done', t=time.time() - started) + async def _analyze_native(self, level: int) -> None: + if level > 2: + tarpath = os.path.join(os.path.dirname(self._path), 'disasm.tar.gz') + if not os.path.exists(tarpath): + ui.warn(f'skipping native code analysis; prepare {tarpath}') + return + + from time import time + at = time() + + with self.store().query().scoped() as q: + pub.sendMessage('progress.core.analysis.nat.begin') + import tarfile + with tarfile.open(tarpath) as tf: + q.file_put_batch(dict(path=i.name, blob=tf.extractfile(i).read(), z=True) for i in tf.getmembers() if (i.isreg() or i.islnk())) # type:ignore[union-attr] + + if level > 3: + pub.sendMessage('progress.core.analysis.nat.analyzing') + from trueseeing.core.android.analysis.nat import analyze_api_in + + def _as_call(g: Iterator[Mapping[str, Any]]) -> Iterator[Call]: + for e in g: + typ = e['typ'] + lang = e['lang'] + sect, offs = e['origin'].split('+') + yield dict( + path=e['fn'], + sect=sect, + offs=int(offs.strip(), 16), + priv=(typ == 'private'), + cpp=(lang == 'cpp'), + target=e['call'] + ) + + q.call_add_batch(_as_call(analyze_api_in(q.file_enum('lib/%')))) + pub.sendMessage('progress.core.analysis.nat.summary', calls=q.call_count()) + + pub.sendMessage('progress.core.analysis.nat.done', t=time() - at) + def get_package_name(self) -> str: return self._package_reader.read(self.target) diff --git a/trueseeing/core/android/db.py b/trueseeing/core/android/db.py index 994b51c3..77326a2a 100644 --- a/trueseeing/core/android/db.py +++ b/trueseeing/core/android/db.py @@ -7,7 +7,7 @@ if TYPE_CHECKING: from typing import Optional, Iterator from trueseeing.core.store import Store - from trueseeing.core.android.model import InvocationPattern + from trueseeing.core.android.model import InvocationPattern, Call class APKStorePrep(StorePrep): def stage1(self) -> None: @@ -134,3 +134,26 @@ def body(self, class_name: str, method_name: Optional[str]) -> Iterator[Op]: stmt1 = 'select addr, l from ops join map on (addr between low and high) where method=:method_name and class=:class_name' for addr, l in self.db.execute(stmt1 if method_name else stmt0, dict(class_name=class_name, method_name=method_name)): yield Op(addr, l) + + def call_add_batch(self, gen: Iterator[Call]) -> None: + stmt0 = 'insert into ncalls (priv, cpp, target, path, sect, offs) values (:priv, :cpp, :target, :path, :sect, :offs)' + self.db.executemany(stmt0, gen) + + def call_count(self) -> int: + stmt0 = 'select count(1) from ncalls' + for n, in self.db.execute(stmt0): + return n # type:ignore[no-any-return] + return 0 + + def calls(self, priv: bool = False, api: bool = False) -> Iterator[Call]: + stmt0 = 'select priv, cpp, target, path, sect, offs from ncalls' + stmt1 = 'select priv, cpp, target, path, sect, offs from ncalls where priv=:is_priv' + for priv, cpp, target, path, sect, offs in self.db.execute(stmt1 if (priv or api) else stmt0, dict(is_priv=priv)): + yield dict( + path=path, + sect=sect, + offs=offs, + priv=priv, + cpp=cpp, + target=target, + ) diff --git a/trueseeing/core/android/model.py b/trueseeing/core/android/model.py index 7579c908..8b78d9c4 100644 --- a/trueseeing/core/android/model.py +++ b/trueseeing/core/android/model.py @@ -27,6 +27,14 @@ class XAPKManifest(TypedDict, total=False): target_sdk_version: str permissions: List[str] + class Call(TypedDict): + path: str + sect: str + offs: int + priv: bool + cpp: bool + target: str + class Op(NamedTuple): addr: int l: str diff --git a/trueseeing/core/db.py b/trueseeing/core/db.py index fa4fb88e..4f75118c 100644 --- a/trueseeing/core/db.py +++ b/trueseeing/core/db.py @@ -79,10 +79,10 @@ def file_get_xml(self, path: str, default: Any = None, patched: bool = False) -> else: return default - def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = False) -> Iterable[Tuple[str, bytes]]: + def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = False, neg: bool = False) -> Iterable[Tuple[str, bytes]]: if pat is not None: - stmt0 = 'select path, z, blob from files where path {op} :pat'.format(op=('like' if not regex else 'regexp')) - stmt1 = 'select path, coalesce(B.z, A.z) as z, coalesce(B.blob, A.blob) as blob from files as A full outer join patches as B using (path) where path {op} :pat'.format(op=('like' if not regex else 'regexp')) + stmt0 = 'select path, z, blob from files where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp')) + stmt1 = 'select path, coalesce(B.z, A.z) as z, coalesce(B.blob, A.blob) as blob from files as A full outer join patches as B using (path) where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp')) for n, z, o in self.db.execute(stmt1 if patched else stmt0, dict(pat=pat)): yield n, zd(o) if z else o else: @@ -91,10 +91,10 @@ def file_enum(self, pat: Optional[str], patched: bool = False, regex: bool = Fal for n, z, o in self.db.execute(stmt3 if patched else stmt2): yield n, zd(o) if z else o - def file_count(self, pat: Optional[str], patched: bool = False, regex: bool = False) -> int: + def file_count(self, pat: Optional[str], patched: bool = False, regex: bool = False, neg: bool = False) -> int: if pat is not None: - stmt0 = 'select count(1) from files where path {op} :pat'.format(op=('like' if not regex else 'regexp')) - stmt1 = 'select conut(1) from files as A full outer join patches as B using (path) where path {op} :pat'.format(op=('like' if not regex else 'regexp')) + stmt0 = 'select count(1) from files where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp')) + stmt1 = 'select conut(1) from files as A full outer join patches as B using (path) where {neg} path {op} :pat'.format(neg='not' if neg else '', op=('like' if not regex else 'regexp')) for nr, in self.db.execute(stmt1 if patched else stmt0, dict(pat=pat)): return nr # type:ignore[no-any-return] else: diff --git a/trueseeing/core/env.py b/trueseeing/core/env.py index 92da62a5..3f158b17 100644 --- a/trueseeing/core/env.py +++ b/trueseeing/core/env.py @@ -61,4 +61,4 @@ def get_device_frida_dir(package_name: str) -> str: @cache def get_cache_schema_id() -> int: - return 0x190b4df6 # FIXME: remember to randomize this whenever incompatible changes occur on cache file structure, or DB schema + return 0x54f6d672 # FIXME: remember to randomize this whenever incompatible changes occur on cache file structure, or DB schema diff --git a/trueseeing/core/ui.py b/trueseeing/core/ui.py index a785b8af..b55ae0de 100644 --- a/trueseeing/core/ui.py +++ b/trueseeing/core/ui.py @@ -194,6 +194,11 @@ def scoped(self) -> Iterator[None]: 'progress.core.analysis.smali.summary':self._core_analysis_smali_summary, 'progress.core.analysis.smali.finalizing':self._core_analysis_smali_finalizing, 'progress.core.analysis.smali.done':self._core_analysis_smali_done, + 'progress.core.analysis.nat.begin':self._core_analysis_nat_begin, + 'progress.core.analysis.nat.analyzing':self._core_analysis_nat_analyzing, + 'progress.core.analysis.nat.summary':self._core_analysis_nat_summary, + 'progress.core.analysis.nat.done':self._core_analysis_nat_done, + 'progress.core.analysis.done':self._core_analysis_done, } try: for k, v in submap.items(): @@ -332,6 +337,21 @@ def _core_analysis_smali_finalizing(self) -> None: def _core_analysis_smali_done(self, t: float) -> None: ui.info(f"analyze: done ({t:.02f} sec)") + def _core_analysis_nat_begin(self) -> None: + ui.info('analyze_nat: analyzing...', nl=False) + + def _core_analysis_nat_analyzing(self) -> None: + ui.info('analyze_nat: calls ...{tail}'.format(tail=' '*20), ow=True, nl=False) + + def _core_analysis_nat_summary(self, calls: Optional[int] = None) -> None: + ui.info(f'analyze_nat: got {calls} calls', ow=True) + + def _core_analysis_nat_done(self, t: float) -> None: + ui.info(f"analyze_nat: done ({t:.02f} sec)") + + def _core_analysis_done(self, t: float) -> None: + ui.info(f"analyze: done ({t:.02f} sec)") + class FileTransferProgressReporter: _bar: Optional[ProgressBar] def __init__(self, desc: str) -> None: diff --git a/trueseeing/libs/android/store.0.sql b/trueseeing/libs/android/store.0.sql index f0f5e914..1368b4a9 100644 --- a/trueseeing/libs/android/store.0.sql +++ b/trueseeing/libs/android/store.0.sql @@ -6,5 +6,6 @@ create table xref_const (addr integer primary key, insn varchar not null, sym va create table xref_invoke (addr integer primary key, target integer, insn varchar not null, sym varchar not null); create table xref_sput (addr integer primary key, insn varchar not null, sym varchar not null); create table xref_iput (addr integer primary key, insn varchar not null, sym varchar not null); +create table ncalls (nr integer primary key, priv boolean not null, cpp boolean not null, target varchar not null, path varchar not null, sect varchar not null, offs integer not null); create index idx_map_class on map (class); create index idx_xref_invoke_target on xref_invoke (target); diff --git a/trueseeing/sig/android/nat.py b/trueseeing/sig/android/nat.py new file mode 100644 index 00000000..a96bbb64 --- /dev/null +++ b/trueseeing/sig/android/nat.py @@ -0,0 +1,93 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +import math +import re +from trueseeing.api import Signature +from trueseeing.core.ui import ui + +if TYPE_CHECKING: + from typing import Dict, AnyStr + from trueseeing.api import SignatureMap, SignatureHelper, ConfigMap + from trueseeing.core.android.context import APKContext + from trueseeing.core.android.model import Call + +class NativeCodeDetector(Signature): + _cvss_info = 'CVSS:3.0/AV:P/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:N/' + + def __init__(self, helper: SignatureHelper) -> None: + self._helper = helper + + @staticmethod + def create(helper: SignatureHelper) -> Signature: + return NativeCodeDetector(helper) + + def get_sigs(self) -> SignatureMap: + return { + 'nat-detect-api':dict(e=self._detect_api, d='Detects API call in native code'), + 'nat-detect-urls':dict(e=self._detect_url, d='Detects URL etc. in native code'), + } + + def get_configs(self) -> ConfigMap: + return dict() + + def _get_context(self) -> APKContext: + return self._helper.get_context().require_type('apk') + + def _format_aff0(self, c: Call) -> str: + return self._format_aff0_manual(c['path'], c['sect'], c['offs']) + + def _format_aff0_match(self, n: str, m: re.Match[AnyStr]) -> str: + return self._format_aff0_manual(n, '', m.start()) + + def _format_aff0_manual(self, n: str, s: str, o: int) -> str: + return '{} ({}+{:08x})'.format(n, s, o) + + async def _detect_api(self) -> None: + context = self._get_context() + with context.store().query().scoped() as q: + for c in q.calls(): + priv, target = c['priv'], c['target'] + self._helper.raise_issue(self._helper.build_issue( + sigid='nat-detect-api', + cvss=self._cvss_info, + title='detected {} call'.format('private' if priv else 'API'), + info0=target, + aff0=self._format_aff0(c), + )) + + async def _detect_url(self) -> None: + from trueseeing.core.android.analysis.nat import analyze_url_in + context = self._get_context() + with context.store().query().scoped() as q: + for d in analyze_url_in(q.file_enum('lib/%', neg=True)): + tentative = False + if '...' in d['v']: + ui.warn('truncated value found; disassemble again with wider fields', onetime=True) + tentative = True + self._helper.raise_issue(self._helper.build_issue( + sigid='nat-detect-urls', + cvss=self._cvss_info, + title='detected {}'.format(d['typ']), + cfd='tentative' if tentative else 'firm', + info0=d['v'], + aff0=d['fn'], + )) + + @classmethod + def _entropy_of(cls, string: str) -> float: + o = 0.0 + m: Dict[str, int] = dict() + for c in string: + m[c] = m.get(c, 0) + 1 + for cnt in m.values(): + freq = float(cnt) / len(string) + o -= freq * (math.log(freq) / math.log(2)) + return o + + @classmethod + def _assumed_randomness_of(cls, string: str) -> float: + try: + return cls._entropy_of(string) / float(math.log(len(string)) / math.log(2)) + except ValueError: + return 0