diff --git a/config.py b/config.py index c9f9e56..4ff522b 100644 --- a/config.py +++ b/config.py @@ -5,43 +5,44 @@ from pathlib import Path from sys import platform -from runcmd import catch_err, run_command - -DEV_SUPPRTED = ['android', 'ios'] # 'windows', 'mobileos', later +DEV_SUPPRTED = ["android", "ios"] # 'windows', 'mobileos', later THIS_DIR = Path(__file__).absolute().parent # Used by data_process only. source_files = { - 'playstore': 'static_data/android_apps_crawl.csv.gz', - 'appstore': 'static_data/ios_apps_crawl.csv.gz', - 'offstore': 'static_data/offstore_apks.csv', + "playstore": "static_data/android_apps_crawl.csv.gz", + "appstore": "static_data/ios_apps_crawl.csv.gz", + "offstore": "static_data/offstore_apks.csv", } -spyware_list_file = 'static_data/spyware.csv' # hand picked +spyware_list_file = "static_data/spyware.csv" # hand picked # --------------------------------------------------------- DEBUG = bool(int(os.getenv("DEBUG", "0"))) TEST = bool(int(os.getenv("TEST", "0"))) DEVICE_PRIMARY_USER = { - 'me': 'Me', - 'child': 'A child of mine', - 'partner': 'My current partner/spouse', - 'family_other': 'Another family member', - 'other': 'Someone else' + "me": "Me", + "child": "A child of mine", + "partner": "My current partner/spouse", + "family_other": "Another family member", + "other": "Someone else", } -ANDROID_PERMISSIONS_CSV = 'static_data/android_permissions.csv' -IOS_DUMPFILES = {'Jailbroken-FS': 'ios_jailbroken.log', - 'Jailbroken-SSH': 'ios_jailbreak_ssh.retcode', - 'Apps': 'ios_apps.plist', 'Info': 'ios_info.xml'} +ANDROID_PERMISSIONS_CSV = "static_data/android_permissions.csv" +IOS_DUMPFILES = { + "Jailbroken-FS": "ios_jailbroken.log", + "Jailbroken-SSH": "ios_jailbreak_ssh.retcode", + "Apps": "ios_apps.plist", + "Info": "ios_info.xml", +} -TEST_APP_LIST = 'static_data/android.test.apps_list' -#TITLE = "Anti-IPS: Stop Intimate Partner Surveillance" +TEST_APP_LIST = "static_data/android.test.apps_list" +# TITLE = "Anti-IPS: Stop Intimate Partner Surveillance" -TITLE = {'title': "IPV Spyware Discovery (ISDi){}".format(" (test)" if TEST else '')} +TITLE = {"title": "IPV Spyware Discovery (ISDi){}".format(" (test)" if TEST else "")} -APP_FLAGS_FILE = 'static_data/app-flags.csv' -APP_INFO_SQLITE_FILE = 'sqlite:///static_data/app-info.db' +APP_FLAGS_FILE = "static_data/app-flags.csv" +APP_INFO_SQLITE_FILE = "sqlite:///static_data/app-info.db" # IOC stalkware indicators IOC_PATH = "data/stalkerware-indicators/" @@ -55,67 +56,80 @@ # there are a couple of sources of truth that may disagree with their "path # relavitity". Needless to say, FIXME SQL_DB_PATH = "sqlite:///{}".format(str(THIS_DIR / "data/fieldstudy.db")) -#SQL_DB_CONSULT_PATH = 'sqlite:///data/consultnotes.db' + ("~test" if TEST else "") +# SQL_DB_CONSULT_PATH = 'sqlite:///data/consultnotes.db' + ("~test" if TEST else "") def set_test_mode(test): global TEST, APP_FLAGS_FILE, SQL_DB_PATH TEST = test if TEST: - if not APP_FLAGS_FILE.endswith('~test'): + if not APP_FLAGS_FILE.endswith("~test"): APP_FLAGS_FILE = APP_FLAGS_FILE + "~test" - if not SQL_DB_PATH.endswith('~test'): + if not SQL_DB_PATH.endswith("~test"): SQL_DB_PATH = SQL_DB_PATH + "~test" else: - if APP_FLAGS_FILE.endswith('~test'): - APP_FLAGS_FILE = APP_FLAGS_FILE.replace("~test", '') - if SQL_DB_PATH.endswith('~test'): - SQL_DB_PATH = SQL_DB_PATH.replace("~test", '') + if APP_FLAGS_FILE.endswith("~test"): + APP_FLAGS_FILE = APP_FLAGS_FILE.replace("~test", "") + if SQL_DB_PATH.endswith("~test"): + SQL_DB_PATH = SQL_DB_PATH.replace("~test", "") set_test_mode(TEST) -STATIC_DATA = THIS_DIR / 'static_data' +STATIC_DATA = THIS_DIR / "static_data" # TODO: We should get rid of this, ADB_PATH is very confusing -ANDROID_HOME = os.getenv('ANDROID_HOME', '') -PLATFORM = ('darwin' if platform == 'darwin' - else 'linux' if platform.startswith('linux') - else 'win32' if platform == 'win32' else None) - -ADB_PATH = shlex.quote(os.path.join(ANDROID_HOME, 'adb')) - -#LIBIMOBILEDEVICE_PATH = shlex.quote(str(STATIC_DATA / ("libimobiledevice-" + PLATFORM))) -LIBIMOBILEDEVICE_PATH = '' +ANDROID_HOME = os.getenv("ANDROID_HOME", "") +PLATFORM = ( + "darwin" + if platform == "darwin" + else ( + "linux" + if platform.startswith("linux") + else "win32" + if platform == "win32" + else None + ) +) + +ADB_PATH = shlex.quote(os.path.join(ANDROID_HOME, "adb")) + +# LIBIMOBILEDEVICE_PATH = shlex.quote(str(STATIC_DATA / ("libimobiledevice-" + PLATFORM))) +LIBIMOBILEDEVICE_PATH = "" # MOBILEDEVICE_PATH = 'mobiledevice' # MOBILEDEVICE_PATH = os.path.join(THISDIR, "mdf") #'python2 -m MobileDevice' -MOBILEDEVICE_PATH = shlex.quote(str(STATIC_DATA / ("ios-deploy-" + PLATFORM))) +if PLATFORM: + MOBILEDEVICE_PATH = shlex.quote(str(STATIC_DATA / ("ios-deploy-" + PLATFORM))) +else: + MOBILEDEVICE_PATH = shlex.quote(str(STATIC_DATA / ("ios-deploy-none"))) -DUMP_DIR = THIS_DIR / 'phone_dumps' -SCRIPT_DIR = THIS_DIR / 'scripts' +DUMP_DIR = THIS_DIR / "phone_dumps" +SCRIPT_DIR = THIS_DIR / "scripts" -DATE_STR = '%Y-%m-%d %I:%M %p' +DATE_STR = "%Y-%m-%d %I:%M %p" ERROR_LOG = [] -APPROVED_INSTALLERS = { - 'com.android.vending', - 'com.sec.android.preloadinstaller'} +APPROVED_INSTALLERS = {"com.android.vending", "com.sec.android.preloadinstaller"} -REPORT_PATH = THIS_DIR / 'reports' +REPORT_PATH = THIS_DIR / "reports" PII_KEY_PATH = STATIC_DATA / "pii.key" + + def open_or_create_random_key(fpath, keylen=32): def create(): import secrets - with fpath.open('wb') as f: + + with fpath.open("wb") as f: f.write(secrets.token_bytes(keylen)) if not fpath.exists(): create() - k = fpath.open('rb').read(keylen) + k = fpath.open("rb").read(keylen) if len(k) != keylen: - creatte() - return fpath.open('rb').read() + create() + return fpath.open("rb").read() + PII_KEY = open_or_create_random_key(PII_KEY_PATH, keylen=32) @@ -127,24 +141,24 @@ def create(): def hmac_serial(ser: str) -> str: - """Returns a string starting with HSN_. If ser already have 'HSN_', + """Returns a string starting with HSN_. If ser already have 'HSN_', it returns the same value.""" - if ser.startswith('HSN_'): + if ser.startswith("HSN_"): return ser - hser = hmac.new(PII_KEY, ser.encode('utf8'), - digestmod=hashlib.sha256).hexdigest() - return f'HSN_{hser}' + hser = hmac.new(PII_KEY, ser.encode("utf8"), digestmod=hashlib.sha256).hexdigest() + return f"HSN_{hser}" + def add_to_error(*args): global ERROR_LOG - m = '\n'.join(str(e) for e in args) + m = "\n".join(str(e) for e in args) print(m) ERROR_LOG.append(m) def error(): global ERROR_LOG - e = '' + e = "" if len(ERROR_LOG) > 0: e, ERROR_LOG = ERROR_LOG[0], ERROR_LOG[1:] diff --git a/parse_dump.py b/parse_dump.py index 3ff0544..792f5ad 100644 --- a/parse_dump.py +++ b/parse_dump.py @@ -5,25 +5,27 @@ import os import re import sys +import config from collections import OrderedDict from functools import reduce from pathlib import Path from plistlib import load - +from typing import List, Dict import pandas as pd from rsonlite import simpleparse -import config -def count_lspaces(l): + + +def count_lspaces(lspaces): # print(">>", repr(l)) - return re.search(r'\S', l).start() + return re.search(r"\S", lspaces).start() def get_d_at_level(d, lvl): - for l in lvl: - if l not in d: - d[l] = {} - d = d[l] + for level in lvl: + if level not in d: + d[level] = {} + d = d[level] return d @@ -38,7 +40,8 @@ def clean_json(d): def _match_keys_w_one(d, key, only_last=False): """Returns a list of keys that matches @key""" sk = re.compile(key) - if not d: return [] + if not d: + return [] if isinstance(d, list): d = d[0] ret = [k for k in d if sk.match(k) is not None] @@ -53,7 +56,7 @@ def match_keys(d, keys): Returns a list of lists """ if isinstance(keys, str): - keys = keys.split('//') + keys = keys.split("//") ret = _match_keys_w_one(d, keys[0]) if len(keys) == 1: return ret @@ -97,33 +100,35 @@ def _extract_one(d, lkeys): def split_equalto_delim(k): - return k.split('=', 1) + return k.split("=", 1) + def prune_empty_keys(d): - """d is an multi-layer dictionary. The function + """d is an multi-layer dictionary. The function converts a sequence of keys into array if all have empty values""" if not isinstance(d, dict): return d if not any(d.values()): return list(d.keys()) - for k,v in d.items(): + for k, v in d.items(): d[k] = prune_empty_keys(v) return d def retrieve(dict_, nest): - ''' + """ Navigates dictionaries like dict_[nest0][nest1][nest2]... gracefully. - ''' - dict_ = dict_.to_dict() # for pandas + """ + dict_ = dict_.to_dict() # for pandas try: return reduce(operator.getitem, nest, dict_) except KeyError as e: - return "" + return f"{e}" except TypeError as e: - return "" + return f"{e}" + class PhoneDump(object): def __init__(self, dev_type, fname): @@ -142,7 +147,7 @@ def info(self, appid): class AndroidDump(PhoneDump): def __init__(self, fname): self.dumpf = fname - super(AndroidDump, self).__init__('android', fname) + super(AndroidDump, self).__init__("android", fname) self.df = self.load_file() # def _extract_lines(self, service): @@ -163,36 +168,41 @@ def __init__(self, fname): @staticmethod def custom_parse(service, lines): - if service == 'appops': + if service == "appops": return lines @staticmethod def new_parse_dump_file(self, fname): - """Not used working using simple parse to parse the files. """ + """Not used working using simple parse to parse the files.""" if not Path(fname).exists(): print("File: {!r} does not exists".format(fname)) data = open(fname) d = {} - service = '' + service = "" join_lines = [] custom_parse_services = {"appops"} + def _parse(lines): try: if service in custom_parse_services: return AndroidDump.custom_parse(service, lines) else: - return simpleparse('\n'.join(join_lines)) + return simpleparse("\n".join(join_lines)) except Exception as ex: - print("Could not parse for {} service={}. Exception={}"\ - .format(fname, service, ex)) + print( + "Could not parse for {} service={}. Exception={}".format( + fname, service, ex + ) + ) return lines for i, l in enumerate(data): - if l.startswith('----'): continue - if l.startswith('DUMP OF SERVICE'): + if l.startswith("----"): + continue + if l.startswith("DUMP OF SERVICE"): if service: d[service] = _parse(join_lines) - service = l.strip().rsplit(' ', 1)[1] + service = l.strip().rsplit(" ", 1)[1] join_lines = [] else: join_lines.append(l) @@ -205,29 +215,30 @@ def _extract_info_lines(self, fp) -> list: content: List[str] = [] a = True while a: - l = fp.readline() - if not l: + line = fp.readline() + if not line: a = False break - if l.startswith('DUMP OF'): + if line.startswith("DUMP OF"): fp.seek(lastpos) return content lastpos = fp.tell() - content.append(l.rstrip()) + content.append(line.rstrip()) return content def _parse_dump_service_info_lines(self, lines) -> dict: res: Dict[str, dict] = {} curr_spcnt = [0] curr_lvl = 0 - lvls = ['' for _ in range(20)] # Max 20 levels allowed + lvls = ["" for _ in range(20)] # Max 20 levels allowed i = 0 while i < len(lines): - l = lines[i]; i += 1 - if not l.strip(): # subsection ends + line = lines[i] + i += 1 + if not line.strip(): # subsection ends continue - l = l.replace('\t', ' '*5) - t_spcnt = count_lspaces(l) + line = line.replace("\t", " " * 5) + t_spcnt = count_lspaces(line) # print(t_spcnt, curr_spcnt, curr_lvl) # if t_spcnt == 1: # print(repr(l)) @@ -237,14 +248,14 @@ def _parse_dump_service_info_lines(self, lines) -> dict: while curr_spcnt and curr_spcnt[-1] > 0 and t_spcnt <= curr_spcnt[-1] - 2: curr_lvl -= 1 curr_spcnt.pop() - if curr_spcnt[-1]>0: + if curr_spcnt[-1] > 0: curr_spcnt[-1] = t_spcnt # assert (t_spcnt != 0) or (curr_lvl == 0), \ # "t_spc: {} <--> curr_lvl: {}\n{}".format(t_spcnt, curr_lvl, l) # print(lvls[:curr_lvl], curr_lvl, curr_spcnt) curr = get_d_at_level(res, lvls[:curr_lvl]) - k = l.strip().rstrip(':') - lvls[curr_lvl] = k # '{} --> {}'.format(curr_lvl, k) + k = line.strip().rstrip(":") + lvls[curr_lvl] = k # '{} --> {}'.format(curr_lvl, k) curr[lvls[curr_lvl]] = {} return prune_empty_keys(res) @@ -254,38 +265,36 @@ def parse_dump_file(self, fname) -> dict: print("File: {!r} does not exists".format(fname)) fp = open(fname) d = {} - service = '' - curr_spcnt, curr_lvl = 0, 0 + service = "" + # curr_spcnt, curr_lvl = 0, 0 while True: - l = fp.readline().rstrip() - if l.startswith('----'): + line = fp.readline().rstrip() + if line.startswith("----"): continue - if l.startswith('DUMP OF SERVICE'): # Service - service = l.strip().rsplit(' ', 1)[1] + if line.startswith("DUMP OF SERVICE"): # Service + service = line.strip().rsplit(" ", 1)[1] content = self._extract_info_lines(fp) print(f"Content: {service!r}", content[:10]) d[service] = self._parse_dump_service_info_lines(content) - elif l.startswith('DUMP OF SETTINGS'): # Setting - setting = 'settings_' + l.strip().rsplit(' ', 1)[1] + elif line.startswith("DUMP OF SETTINGS"): # Setting + setting = "settings_" + line.strip().rsplit(" ", 1)[1] content = self._extract_info_lines(fp) - settings_d = dict( - l.split('=', 1) for l in content if '=' in l - ) + settings_d = dict(line.split("=", 1) for line in content if "=" in line) d[setting] = settings_d else: - if not l: + if not line: break - print(f"Something wrong! --> {l!r}") + print(f"Something wrong! --> {line!r}") return d def load_file(self, failed_before=False): - fname = self.fname.rsplit('.', 1)[0] + '.txt' - json_fname = fname.rsplit('.', 1)[0] + '.json' + fname = self.fname.rsplit(".", 1)[0] + ".txt" + json_fname = fname.rsplit(".", 1)[0] + ".json" d = {} if os.path.exists(json_fname): - with open(json_fname, 'r') as f: + with open(json_fname, "r") as f: try: d = json.load(f) except Exception as ex: @@ -294,51 +303,54 @@ def load_file(self, failed_before=False): os.unlink(json_fname) return self.load_file(failed_before=True) else: - with open(json_fname, 'w') as f: + with open(json_fname, "w") as f: try: d = self.parse_dump_file(fname) json.dump(d, f, indent=2) except Exception as ex: print("File ({!r}) could not be opened or parsed.".format(fname)) print("Exception: {}".format(ex)) - raise(ex) + raise (ex) return {} return d @staticmethod def get_data_usage(d, process_uid): - if 'net_stats' not in d: - return { - "foreground": "unknown", - "background": "unknown" - } + if "net_stats" not in d: + return {"foreground": "unknown", "background": "unknown"} # FIXME: pandas.errors.ParserError: Error tokenizing data. C error: Expected 21 fields in line 556, saw 22 # parser error (tested on SM-G965U,Samsung,8.0.0) - - net_stats = pd.read_csv(io.StringIO( - '\n'.join(d['net_stats']) - ), on_bad_lines='warn') + + net_stats = pd.read_csv( + io.StringIO("\n".join(d["net_stats"])), on_bad_lines="warn" + ) d = net_stats.query('uid_tag_int == "{}"'.format(process_uid))[ - ['uid_tag_int', 'cnt_set', 'rx_bytes', 'tx_bytes']].astype(int) + ["uid_tag_int", "cnt_set", "rx_bytes", "tx_bytes"] + ].astype(int) def s(c): - return (d[d['cnt_set'] == c].eval('rx_bytes+tx_bytes').sum() - / (1024*1024)) + return d[d["cnt_set"] == c].eval("rx_bytes+tx_bytes").sum() / (1024 * 1024) + return { "foreground": "{:.2f} MB".format(s(1)), - "background": "{:.2f} MB".format(s(0)) + "background": "{:.2f} MB".format(s(0)), } @staticmethod def get_battery_stat(d, uidu): - b = list(get_all_leaves(match_keys( - d, "batterystats//Statistics since last charge//Estimated power use .*" - "//^Uid {}:.*".format(uidu)) - )) + b = list( + get_all_leaves( + match_keys( + d, + "batterystats//Statistics since last charge//Estimated power use .*" + "//^Uid {}:.*".format(uidu), + ) + ) + ) if not b: return "0 (mAh)" else: - t = b[0].split(':') + t = b[0].split(":") return t[1] return b @@ -348,12 +360,13 @@ def apps(self): return {} def get_appid_h(txt): - m = re.match(r'Package \[(?P.*)\] \((?P.*)\)', txt) + m = re.match(r"Package \[(?P.*)\] \((?P.*)\)", txt) if m: return m.groups() + packages = map( get_appid_h, - get_all_leaves(match_keys(d, '^package$//^Packages//^Package .*')) + get_all_leaves(match_keys(d, "^package$//^Packages//^Package .*")), ) return [c for c in packages if c] @@ -362,36 +375,37 @@ def info(self, appid): if not d: return {} package = extract( - d, - match_keys(d, '^package$//^Packages//^Package \[{}\].*'.format(appid)) + d, match_keys(d, "^package$//^Packages//^Package [{}].*".format(appid)) ) - other_info = [get_all_leaves(match_keys(package, v)) - for v in ['userId', 'firstInstallTime', 'lastUpdateTime']] - res = dict(map( - split_equalto_delim, [x[0] for x in other_info if x] - )) + other_info = [ + get_all_leaves(match_keys(package, v)) + for v in ["userId", "firstInstallTime", "lastUpdateTime"] + ] + res = dict(map(split_equalto_delim, [x[0] for x in other_info if x])) - if 'userId' not in res: + if "userId" not in res: print("UserID not found in res={}".format(res)) return {} - process_uid = res['userId'] - del res['userId'] - memory = match_keys(d, 'meminfo//Total PSS by process//.*: {}.*'.format(appid)) - uidu_match = list(get_all_leaves( - match_keys(d, 'procstats//CURRENT STATS//\* {} / .*'.format(appid)) - )) + process_uid = res["userId"] + del res["userId"] + # memory = match_keys(d, "meminfo//Total PSS by process//.*: {}.*".format(appid)) + uidu_match = list( + get_all_leaves( + match_keys(d, "procstats//CURRENT STATS//* {} / .*".format(appid)) + ) + ) print(uidu_match) if uidu_match: - uidu = uidu_match[-1].split(' / ') + uidu = uidu_match[-1].split(" / ") else: uidu = "Not Found" if len(uidu) > 1: uidu = uidu[1] else: uidu = uidu[0] - res['data_usage'] = self.get_data_usage(d, process_uid) - res['battery_usage'] = self.get_battery_stat(d, uidu) # (mAh) + res["data_usage"] = self.get_data_usage(d, process_uid) + res["battery_usage"] = self.get_battery_stat(d, uidu) # (mAh) # print('RESULTS') # print(res) # print('END RESULTS') @@ -412,23 +426,25 @@ class IosDump(PhoneDump): # 'UISupportedInterfaceOrientations'] # INDEX = 'CFBundleIdentifier' def __init__(self, fplist, finfo=None): - self.device_type = 'ios' + self.device_type = "ios" self.fname = fplist if finfo: self.finfo = finfo self.deviceinfo = self.load_device_info() - self.device_class = self.deviceinfo.get('DeviceClass', "") + self.device_class = self.deviceinfo.get("DeviceClass", "") else: - self.device_class = 'iPhone/iPad' + self.device_class = "iPhone/iPad" self.df = self.load_file() # FIXME: not efficient to load here everytime? # load permissions mappings and apps plist self.permissions_map = {} self.model_make_map = {} - with open(os.path.join(config.STATIC_DATA, 'ios_permissions.json'), 'r') as fh: + with open(os.path.join(config.STATIC_DATA, "ios_permissions.json"), "r") as fh: self.permissions_map = json.load(fh) - with open(os.path.join(config.STATIC_DATA, 'ios_device_identifiers.json'), 'r') as fh: + with open( + os.path.join(config.STATIC_DATA, "ios_device_identifiers.json"), "r" + ) as fh: self.model_make_map = json.load(fh) def __nonzero__(self): @@ -439,7 +455,7 @@ def __len__(self): def load_device_info(self): try: - with open(self.finfo, 'rb') as data: + with open(self.finfo, "rb") as data: device_info = load(data) return device_info @@ -450,7 +466,7 @@ def load_device_info(self): "ProductType": "", "ModelNumber": "", "RegionInfo": "", - "ProductVersion": "" + "ProductVersion": "", } def load_file(self): @@ -458,7 +474,7 @@ def load_file(self): try: # FIXME: somehow, get the ios_apps.plist into a dataframe. print("fname is: {}".format(self.fname)) - with open(self.fname, 'rb') as app_data: + with open(self.fname, "rb") as app_data: apps_plist = load(app_data) d = pd.DataFrame(apps_plist) d['appId'] = d['CFBundleIdentifier'] @@ -472,33 +488,58 @@ def load_file(self): def check_unseen_permissions(self, permissions): for permission in permissions: if permission not in self.permissions_map: - print('Have not seen '+str(permission)+' before. Making note of this...') - permission_human_readable = permission.replace('kTCCService','') - with open(os.path.join(config.THIS_DIR,'ios_permissions.json'), 'w') as fh: + print( + "Have not seen " + + str(permission) + + " before. Making note of this..." + ) + permission_human_readable = permission.replace("kTCCService", "") + with open( + os.path.join(config.THIS_DIR, "ios_permissions.json"), "w" + ) as fh: self.permissions_map[permission] = permission_human_readable fh.write(json.dumps(self.permissions_map)) - print('Noted.') - #print('\t'+msg+": "+str(PERMISSIONS_MAP[permission])+"\tReason: "+app.get(permission,'system app')) + print("Noted.") + # print('\t'+msg+": "+str(PERMISSIONS_MAP[permission])+"\tReason: "+app.get(permission,'system app')) def get_permissions(self, app: str) -> list: - ''' - Returns a list of tuples (permission, developer-provided reason for permission). - Could modify this function to include whether or not the permission can be adjusted - in Settings. - ''' - system_permissions = retrieve(app, ['Entitlements', 'com.apple.private.tcc.allow']) - adjustable_system_permissions = retrieve(app, ['Entitlements','com.apple.private.tcc.allow.overridable']) + """ + Returns a list of tuples (permission, developer-provided reason for permission). + Could modify this function to include whether or not the permission can be adjusted + in Settings. + """ + system_permissions = retrieve( + app, ["Entitlements", "com.apple.private.tcc.allow"] + ) + adjustable_system_permissions = retrieve( + app, ["Entitlements", "com.apple.private.tcc.allow.overridable"] + ) third_party_permissions = list(set(app.keys()) & set(self.permissions_map)) - self.check_unseen_permissions(list(system_permissions)+list(adjustable_system_permissions)) + self.check_unseen_permissions( + list(system_permissions) + list(adjustable_system_permissions) + ) # (permission used, developer reason for requesting the permission) - all_permissions = list(set(map(lambda x: \ - (self.permissions_map[x], app.get(x, default="permission granted by system")),\ - list(set(system_permissions) | \ - set(adjustable_system_permissions) | set(third_party_permissions))))) - pii = retrieve(app, ['Entitlements', - 'com.apple.private.MobileGestalt.AllowedProtectedKeys']) - #print("\tPII: "+str(pii)) + all_permissions = list( + set( + map( + lambda x: ( + self.permissions_map[x], + app.get(x, default="permission granted by system"), + ), + list( + set(system_permissions) + | set(adjustable_system_permissions) + | set(third_party_permissions) + ), + ) + ) + ) + # pii = retrieve( + # app, + # ["Entitlements", "com.apple.private.MobileGestalt.AllowedProtectedKeys"], + # ) + # print("\tPII: "+str(pii)) return all_permissions def device_info(self): @@ -508,53 +549,60 @@ def device_info(self): # https://gist.github.com/shu223/c108bd47b4c9271e55b5 m = {} try: - m['model'] = self.model_make_map[self.deviceinfo['ProductType']] - except KeyError as e: - m['model'] = "{DeviceClass} (Model {ModelNumber} {RegionInfo})"\ - .format(**self.deviceinfo) - m['version'] = self.deviceinfo['ProductVersion'] + m["model"] = self.model_make_map[self.deviceinfo["ProductType"]] + except KeyError: + m["model"] = "{DeviceClass} (Model {ModelNumber} {RegionInfo})".format( + **self.deviceinfo + ) + m["version"] = self.deviceinfo["ProductVersion"] return "{model} (running iOS {version})".format(**m), m def info(self, appid): """ - Returns dict containing the following: - 'permission': tuple (all permissions of appid, developer - reasons for requesting the permissions) - 'title': the human-friendly name of the app. - 'jailbroken': tuple (whether or not phone is suspected to be jailbroken, rationale) - 'phone_kind': tuple (make, OS version) + Returns dict containing the following: + 'permission': tuple (all permissions of appid, developer + reasons for requesting the permissions) + 'title': the human-friendly name of the app. + 'jailbroken': tuple (whether or not phone is suspected to be jailbroken, rationale) + 'phone_kind': tuple (make, OS version) """ - d = self.df + # d = self.df res = { - 'title':'', - 'jailbroken':'', #TODO: These are never set: phone_kind and jailbroken - 'phone_kind':'' + "title": "", + "jailbroken": "", # TODO: These are never set: phone_kind and jailbroken + "phone_kind": "", } - #app = self.df.iloc[appidx,:].dropna() - app = self.df[self.df['CFBundleIdentifier']==appid].squeeze().dropna() + # app = self.df.iloc[appidx,:].dropna() + app = self.df[self.df["CFBundleIdentifier"] == appid].squeeze().dropna() party = app.ApplicationType.lower() - if party in ['system', 'user']: - print(app['CFBundleName'],"("+app['CFBundleIdentifier']+") is a {} app and has permissions:"\ - .format(party)) - # permissions are an array that returns the permission id and an explanation. - permissions = self.get_permissions(app) - res['permissions'] = [(p.capitalize(), r) for p, r in permissions] - res['title'] = app['CFBundleExecutable'] - res['App Version'] = app['CFBundleVersion'] - res['Install Date'] = ''' + if party in ["system", "user"]: + print(f"{app["CFBundleName"]} ({app["CFBundleIdentifier"]}) is a {party} app and has permissions:") + # permissions are an array that returns the permission id and an explanation. + permissions = self.get_permissions(app) + res["permissions"] = [(p.capitalize(), r) for p, r in permissions] + res["title"] = app["CFBundleExecutable"] + res["App Version"] = app["CFBundleVersion"] + res["Install Date"] = """ Apple does not officially record iOS app installation dates. To view when '{}' was *last used*: [Settings -> General -> {} Storage]. To view the *purchase date* of '{}', follow these instructions: https://www.ipvtechresearch.org/post/guides/apple/. These are the closest possible approximations to installation date available to - end-users. '''.format(res['title'], self.device_class, res['title']) - res['Battery Usage'] = "To see recent battery usage of '{title}': "\ - "[Settings -> Battery -> Battery Usage].".format(**res) - res['Data Usage'] = "To see recent data usage (not including Wifi) of '{}': [Settings -> Cellular -> Cellular Data].".format(res['title']) + end-users. """.format(res["title"], self.device_class, res["title"]) + + res["Battery Usage"] = ( + "To see recent battery usage of '{title}': " + "[Settings -> Battery -> Battery Usage].".format(**res) + ) + res["Data Usage"] = ( + "To see recent data usage (not including Wifi) of '{}': [Settings -> Cellular -> Cellular Data].".format( + res["title"] + ) + ) return res - # TODO: The following function is incorrect or incomplete. Commenting out for now. + # TODO: The following function is incorrect or incomplete. Commenting out for now. # def all(self): # for appidx in range(self.df.shape[0]): # app = self.df.iloc[appidx,:].dropna() @@ -569,31 +617,34 @@ def info(self, appid): # print("") def system_apps(self): - #return self.df.query('ApplicationType=="System"')['CFBundleIdentifier'].tolist() - return self.df.query('ApplicationType=="System"')['CFBundleIdentifier'] + # return self.df.query('ApplicationType=="System"')['CFBundleIdentifier'].tolist() + return self.df.query('ApplicationType=="System"')["CFBundleIdentifier"] def installed_apps_titles(self) -> pd.DataFrame: if self: - return self.df.rename(index=str, - columns={'CFBundleExecutable': 'title'}) + return self.df.rename(index=str, columns={"CFBundleExecutable": "title"}) def installed_apps(self): - #return self.df.index + # return self.df.index if self.df is None: return [] print("parse_dump (installed_apps): >>", self.df.columns, len(self.df)) - return self.df['appId'].to_list() + return self.df["appId"].to_list() if __name__ == "__main__": fname = sys.argv[1] # data = [l.strip() for l in open(fname)] ddump: PhoneDump - if sys.argv[2] == 'android': + if sys.argv[2] == "android": ddump = AndroidDump(fname) - json.dump(ddump.parse_dump_file(fname), open(fname.rsplit('.', 1)[0] + '.json', 'w'), indent=2) - print(json.dumps(ddump.info('ru.kidcontrol.gpstracker'), indent=2)) - elif sys.argv[2] == 'ios': + json.dump( + ddump.parse_dump_file(fname), + open(fname.rsplit(".", 1)[0] + ".json", "w"), + indent=2, + ) + print(json.dumps(ddump.info("ru.kidcontrol.gpstracker"), indent=2)) + elif sys.argv[2] == "ios": ddump = IosDump(fname) # print(ddump.installed_apps()) print(ddump.installed_apps_titles().to_csv()) diff --git a/phone_scanner.py b/phone_scanner.py index 4b8873c..06e0338 100644 --- a/phone_scanner.py +++ b/phone_scanner.py @@ -1,39 +1,37 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import hashlib -import hmac import os import re import shlex import sqlite3 import sys -from collections import defaultdict -from datetime import datetime - -import pandas as pd - import blocklist import config import parse_dump +import pandas as pd +from collections import defaultdict +from datetime import datetime from android_permissions import all_permissions from runcmd import catch_err, run_command class AppScan(object): - device_type = '' + device_type = "" # app_info_conn = dataset.connect(config.APP_INFO_SQLITE_FILE) app_info_conn = sqlite3.connect( - config.APP_INFO_SQLITE_FILE.replace('sqlite:///', ''), - check_same_thread=False + config.APP_INFO_SQLITE_FILE.replace("sqlite:///", ""), check_same_thread=False ) def __init__(self, dev_type, cli): - assert dev_type in config.DEV_SUPPRTED, \ - "dev={!r} is not supported yet. Allowed={}"\ - .format(dev_type, config.DEV_SUPPRTED) + assert ( + dev_type in config.DEV_SUPPRTED + ), "dev={!r} is not supported yet. Allowed={}".format( + dev_type, config.DEV_SUPPRTED + ) self.device_type = dev_type - self.cli = cli # The cli of the device, e.g., adb or mobiledevice + self.cli = cli # The cli of the device, e.g., adb or mobiledevice + self.parse_dump = None # Only here to please the linter def setup(self): """If the device needs some setup to work.""" @@ -42,47 +40,65 @@ def setup(self): def devices(self): raise Exception("Not implemented") + def get_system_apps(self, serialno, from_device: bool) -> list: + pass + def get_apps(self, serialno: str, from_dump: bool) -> list: pass def get_offstore_apps(self, serialno:str, from_dump: bool) -> list: return [] - def dump_path(self, serial, fkind='json'): + def get_app_titles(self, serialno): + return [] + + def dump_path(self, serial, fkind="json"): hmac_serial = config.hmac_serial(serial) - if self.device_type == 'ios': - devicedumpsdir = os.path.join(config.DUMP_DIR, \ - '{}_{}'.format(hmac_serial, 'ios')) - if fkind == 'Jailbroken-FS': - return os.path.join(devicedumpsdir, config.IOS_DUMPFILES.get('Jailbroken-FS','')) - elif fkind == 'Jailbroken-SSH': - return os.path.join(devicedumpsdir, config.IOS_DUMPFILES.get('Jailbroken-SSH','')) - elif fkind == 'Device_Info': - return os.path.join(devicedumpsdir, config.IOS_DUMPFILES.get('Info','')) - elif fkind == 'Apps': - return os.path.join(devicedumpsdir, config.IOS_DUMPFILES.get('Apps','')) - elif fkind == 'Dir': + if self.device_type == "ios": + devicedumpsdir = os.path.join( + config.DUMP_DIR, "{}_{}".format(hmac_serial, "ios") + ) + if fkind == "Jailbroken-FS": + return os.path.join( + devicedumpsdir, config.IOS_DUMPFILES.get("Jailbroken-FS", "") + ) + elif fkind == "Jailbroken-SSH": + return os.path.join( + devicedumpsdir, config.IOS_DUMPFILES.get("Jailbroken-SSH", "") + ) + elif fkind == "Device_Info": + return os.path.join( + devicedumpsdir, config.IOS_DUMPFILES.get("Info", "") + ) + elif fkind == "Apps": + return os.path.join( + devicedumpsdir, config.IOS_DUMPFILES.get("Apps", "") + ) + elif fkind == "Dir": return devicedumpsdir else: # returns apps dumpfile if fkind isn't explicitly specified. - return os.path.join(devicedumpsdir, config.IOS_DUMPFILES.get('Apps','')) + return os.path.join( + devicedumpsdir, config.IOS_DUMPFILES.get("Apps", "") + ) - return os.path.join(config.DUMP_DIR, '{}_{}.{}'.format( - hmac_serial, self.device_type, fkind)) + return os.path.join( + config.DUMP_DIR, "{}_{}.{}".format(hmac_serial, self.device_type, fkind) + ) def app_details(self, serialno, appid): try: - d = pd.read_sql('select * from apps where appid=?', - self.app_info_conn, - params=(appid,)) - if not isinstance(d.get('permissions', ''), list): - d['permissions'] = d.get('permissions', pd.Series([])) - d['permissions'] = d['permissions'].fillna('').str.split(', ') - if 'descriptionHTML' not in d: - d['descriptionHTML'] = d['description'] + d = pd.read_sql( + "select * from apps where appid=?", self.app_info_conn, params=(appid,) + ) + if not isinstance(d.get("permissions", ""), list): + d["permissions"] = d.get("permissions", pd.Series([])) + d["permissions"] = d["permissions"].fillna("").str.split(", ") + if "descriptionHTML" not in d: + d["descriptionHTML"] = d["description"] dfname = self.dump_path(serialno) - if self.device_type == 'ios': + if self.device_type == "ios": ddump = self.parse_dump if not ddump: ddump = parse_dump.IosDump(dfname) @@ -91,21 +107,21 @@ def app_details(self, serialno, appid): info = ddump.info(appid) - print('BEGIN APP INFO') + print("BEGIN APP INFO") print("info={}".format(info)) - print('END APP INFO') + print("END APP INFO") # FIXME: sloppy iOS hack but should fix later, just add these to DF # directly. - if self.device_type == 'ios': + if self.device_type == "ios": # TODO: add extra info about iOS? Like idevicediagnostics # ioregentry AppleARMPMUCharger or IOPMPowerSource or # AppleSmartBattery. - d['permissions'] = pd.Series(info.get('permissions','')) - #d['permissions'] = [info.get('permissions','')] - d['title'] = pd.Series(info.get('title','')) - #del info['permissions'] + d["permissions"] = pd.Series(info.get("permissions", "")) + # d['permissions'] = [info.get('permissions','')] + d["title"] = pd.Series(info.get("title", "")) + # del info['permissions'] print("AppInfo: ", info, appid, dfname, ddump) - return d.fillna(''), info + return d.fillna(""), info except KeyError as ex: print(">>> Exception:::", ex, file=sys.stderr) return pd.DataFrame([]), dict() @@ -119,16 +135,15 @@ def find_spyapps(self, serialno, from_dump=False): if len(installed_apps) <= 0: return pd.DataFrame( - [], - columns=['title', 'flags', 'score', 'class_', 'html_flags'] + [], columns=["title", "flags", "score", "class_", "html_flags"] ) r = blocklist.app_title_and_flag( pd.DataFrame({'appId': installed_apps}), offstore_apps=self.get_offstore_apps(serialno, from_dump=from_dump), system_apps=self.get_system_apps(serialno, from_dump=from_dump) ) - r['title'] = r.title.fillna('') - if self.device_type == 'android': + r["title"] = r.title.fillna("") + if self.device_type == "android": td = pd.read_sql( 'select appid as appId, title from apps where appid in (?{})'.format( ', ?'*(len(installed_apps)-1) @@ -137,21 +152,24 @@ def find_spyapps(self, serialno, from_dump=False): elif self.device_type == 'ios': td = self.get_app_titles(serialno).set_index('appId') - r.set_index('appId', inplace=True) - r.loc[td.index, 'title'] = td.get('title','') + r.set_index("appId", inplace=True) + r.loc[td.index, "title"] = td.get("title", "") r.reset_index(inplace=True) - r['class_'] = r['flags'].apply(blocklist.assign_class) - r['score'] = r['flags'].apply(blocklist.score) - r['title'] = r.title.str.encode('ascii', errors='ignore')\ - .str.decode('ascii') - r['title'] = r.title.fillna('') - r['html_flags'] = r['flags'].apply(blocklist.flag_str) - r.sort_values(by=['score', 'appId'], ascending=[False, True], - inplace=True, na_position='last') - r.set_index('appId', inplace=True) + r["class_"] = r["flags"].apply(blocklist.assign_class) + r["score"] = r["flags"].apply(blocklist.score) + r["title"] = r.title.str.encode("ascii", errors="ignore").str.decode("ascii") + r["title"] = r.title.fillna("") + r["html_flags"] = r["flags"].apply(blocklist.flag_str) + r.sort_values( + by=["score", "appId"], + ascending=[False, True], + inplace=True, + na_position="last", + ) + r.set_index("appId", inplace=True) - return r[['title', 'flags', 'score', 'class_', 'html_flags']] + return r[["title", "flags", "score", "class_", "html_flags"]] def flag_apps(self, serialno): installed_apps = self.get_apps(serialno, from_dump=False) @@ -162,20 +180,20 @@ def uninstall(self, serial, appid): pass def save(self, table, **kwargs): - try: - tab = db.get_table(table) - kwargs['device'] = kwargs.get('device', self.device_type) - tab.insert(kwargs) - db.commit() - return True - except Exception as ex: - print(">> Exception:", ex, file=sys.stderr) - return False + # try: + # tab = db.get_table(table) + # kwargs['device'] = kwargs.get('device', self.device_type) + # tab.insert(kwargs) + # db.commit() + # return True + # except Exception as ex: + # print(">> Exception:", ex, file=sys.stderr) + # return False + return False def device_info(self, serial): return "Test Phone", {} - def isrooted(self, serial): return (False, []) @@ -186,25 +204,32 @@ class AndroidScan(AppScan): devices` showing the device as connected before running this scan function. """ + def __init__(self): - super(AndroidScan, self).__init__('android', config.ADB_PATH) + super(AndroidScan, self).__init__("android", config.ADB_PATH) self.serialno = None self.installed_apps = None self.dump_d = None # self.setup() def setup(self): - p = run_command( - '{cli} kill-server; {cli} start-server' - ) + p = run_command("{cli} kill-server; {cli} start-server") if p != 0: - print(">> Setup failed with returncode={}. ~~ ex={!r}" - .format(p.returncode, p.stderr.read() + p.stdout.read()), file=sys.stderr) + print( + ">> Setup failed with returncode={}. ~~ ex={!r}".format( + p.returncode, p.stderr.read() + p.stdout.read() + ), + file=sys.stderr, + ) + def _get_apps_from_device(self, serialno, flag) -> list: """get apps from the device""" cmd = "{cli} -s {serial} shell pm list packages {flag} | sed 's/^package://g' | sort" - s = catch_err(run_command(cmd, serial=serialno, flag=flag), - msg="App search failed", cmd=cmd) + s = catch_err( + run_command(cmd, serial=serialno, flag=flag), + msg="App search failed", + cmd=cmd, + ) if not s: self.setup() return [] @@ -213,12 +238,12 @@ def _get_apps_from_device(self, serialno, flag) -> list: return installed_apps def _get_apps_from_dump(self, serialno): - hmac_serial = config.hmac_serial(serialno) + # hmac_serial = config.hmac_serial(serialno) # Try to read from the dump dump_file = self.dump_path(serialno) self.dump_d = parse_dump.AndroidDump(dump_file) app_and_codes = self.dump_d.apps() - return [a for a,c in app_and_codes] + return [a for a, c in app_and_codes] def get_apps(self, serialno: str, from_dump: bool=True) -> list: print(f"Getting Android apps: {serialno} from_dump={from_dump}") @@ -226,9 +251,13 @@ def get_apps(self, serialno: str, from_dump: bool=True) -> list: if (not from_dump): installed_apps = self._get_apps_from_device(serialno, '-u') if installed_apps: - q = run_command( - 'bash scripts/android_scan.sh scan {ser} {hmac_serial}', - ser=serialno, hmac_serial=hmac_serial, nowait=True) + # q = run_command( + # "bash scripts/android_scan.sh scan {ser} {hmac_serial}", + # ser=serialno, + # hmac_serial=hmac_serial, + # nowait=True, + # ) + pass else: # Try loading from the dump installed_apps = self._get_apps_from_dump(hmac_serial) @@ -239,7 +268,7 @@ def get_system_apps(self, serialno, from_dump=False): if (not from_dump): apps = self._get_apps_from_device(serialno, '-s') else: - apps = [] ## TODO: fix this later, not sure how to get from dump + apps = [] # TODO: fix this later, not sure how to get from dump return apps def get_offstore_apps(self, serialno, from_dump=False): @@ -249,42 +278,43 @@ def get_offstore_apps(self, serialno, from_dump=False): rooted, reason = self.isrooted(serialno) approved = config.APPROVED_INSTALLERS if not rooted: - for l in self._get_apps_from_device(serialno, '-i -u -s'): - l = l.split() - if len(l) == 2: - apps, t = l - installer = t.replace('installer=', '') - if installer not in approved and installer != 'null': + for line in self._get_apps_from_device(serialno, "-i -u -s"): + line = line.split() + if len(line) == 2: + apps, t = line + installer = t.replace("installer=", "") + if installer not in approved and installer != "null": # if system is rooted, won't make any difference spoofing wise approved.add(installer) print(f"Approved Installers:{approved}") - for l in self._get_apps_from_device(serialno, '-i -u -3'): - l = l.split() - if len(l) == 2: - apps, t = l - installer = t.replace('installer=', '') + for line in self._get_apps_from_device(serialno, "-i -u -3"): + line = line.split() + if len(line) == 2: + apps, t = line + installer = t.replace("installer=", "") if installer not in approved: offstore.append(apps) else: - print(">>>>>> ERROR: {}".format(l), file=sys.stderr) + print(">>>>>> ERROR: {}".format(line), file=sys.stderr) return offstore def devices(self): # FIXME: check for errors related to err in runcmd.py. - #cmd = '{cli} devices | tail -n +2 | cut -f2' - #runcmd = catch_err(run_command(cmd), cmd=cmd).strip() - #cmd = '{cli} kill-server; {cli} start-server' - #s = catch_err(run_command(cmd), time=30, msg="ADB connection failed", cmd=cmd) - cmd = '{cli} devices | tail -n +2' - runcmd = catch_err(run_command(cmd), cmd=cmd).strip().split('\n') + # cmd = '{cli} devices | tail -n +2 | cut -f2' + # runcmd = catch_err(run_command(cmd), cmd=cmd).strip() + # cmd = '{cli} kill-server; {cli} start-server' + # s = catch_err(run_command(cmd), time=30, msg="ADB connection failed", cmd=cmd) + cmd = "{cli} devices | tail -n +2" + runcmd = catch_err(run_command(cmd), cmd=cmd).strip().split("\n") conn_devices = [] for rc in runcmd: d = rc.split() - if len(d) != 2: continue + if len(d) != 2: + continue device, state = rc.split() device = device.strip() - if state.strip() == 'device': - conn_devices.append(device) + if state.strip() == "device": + conn_devices.append(device) return conn_devices # def devices_info(self): @@ -293,19 +323,23 @@ def devices(self): def device_info(self, serial): m = {} - cmd = '{cli} -s {serial} shell getprop ro.product.brand' - m['brand'] = run_command(cmd, serial=serial).stdout.read().decode('utf-8').title() + cmd = "{cli} -s {serial} shell getprop ro.product.brand" + m["brand"] = ( + run_command(cmd, serial=serial).stdout.read().decode("utf-8").title() + ) - cmd = '{cli} -s {serial} shell getprop ro.product.model' - m['model'] = run_command(cmd, serial=serial).stdout.read().decode('utf-8') + cmd = "{cli} -s {serial} shell getprop ro.product.model" + m["model"] = run_command(cmd, serial=serial).stdout.read().decode("utf-8") - cmd = '{cli} -s {serial} shell getprop ro.build.version.release' - m['version'] = run_command(cmd, serial=serial).stdout.read().decode('utf-8').strip() + cmd = "{cli} -s {serial} shell getprop ro.build.version.release" + m["version"] = ( + run_command(cmd, serial=serial).stdout.read().decode("utf-8").strip() + ) cmd = '{cli} -s {serial} shell dumpsys batterystats | grep -i "Start clock time:" | head -n1' - runcmd = catch_err(run_command(cmd, serial=serial), cmd=cmd) - #m['last_full_charge'] = datetime.strptime(runcmd.split(':')[1].strip(), '%Y-%m-%d-%H-%M-%S') - m['last_full_charge'] = datetime.now() + # runcmd = catch_err(run_command(cmd, serial=serial), cmd=cmd) + # m['last_full_charge'] = datetime.strptime(runcmd.split(':')[1].strip(), '%Y-%m-%d-%H-%M-%S') + m["last_full_charge"] = datetime.now() return "{brand} {model} (running Android {version})".format(**m), m # def dump_phone(self, serialno=None): @@ -322,10 +356,12 @@ def device_info(self, serial): # print("Dump success! Written to={}".format(outfname)) def uninstall(self, serial, appid): - cmd = '{cli} uninstall {appid!r}' - s = catch_err(run_command(cmd, - appid=shlex.quote(appid)), - cmd=cmd, msg="Could not uninstall") + cmd = "{cli} uninstall {appid!r}" + s = catch_err( + run_command(cmd, appid=shlex.quote(appid)), + cmd=cmd, + msg="Could not uninstall", + ) return s != -1 def app_details(self, serialno, appid): @@ -342,107 +378,181 @@ def app_details(self, serialno, appid): # FIXME: some appopps in non_hf_recent are not included in the # output. maybe concat hf_recent with them? - info['Date of Scan'] = datetime.now().strftime(config.DATE_STR) - info['Installation Date'] = stats.get('firstInstallTime', '') - info['Last Updated'] = stats.get('lastUpdateTime', '') + info["Date of Scan"] = datetime.now().strftime(config.DATE_STR) + info["Installation Date"] = stats.get("firstInstallTime", "") + info["Last Updated"] = stats.get("lastUpdateTime", "") # info['Last Used'] = stats['used'] # TODO: what is the difference between usedScr and used? Does a # background process count as used? Probably not since appOps # permissions have been more recent than 'used' on some scans. # info['Last Used Screen'] = stats['usedScr'] - info['App Version'] = stats.get('versionName', '') + info["App Version"] = stats.get("versionName", "") # info['App Version Code'] = stats['versionCode'] # FIXME: if Unknown, use 'permission_abbrv' instead. - hf_recent.loc[hf_recent['label']=='unknown', 'label'] = hf_recent.get('permission_abbrv','') + hf_recent.loc[hf_recent["label"] == "unknown", "label"] = hf_recent.get( + "permission_abbrv", "" + ) # hf_recent['label'] = hf_recent[['label', # 'timestamp']].apply(lambda x: ''.join(str(x), axis=1)) - if len(hf_recent.get('label','')) > 0: - hf_recent['label'] = hf_recent.apply( + if len(hf_recent.get("label", "")) > 0: + hf_recent["label"] = hf_recent.apply( lambda x: "{} (last used: {})".format( - x['label'], 'never' if 'unknown' in x['timestamp'].lower() else x['timestamp']), - axis=1 + x["label"], + "never" if "unknown" in x["timestamp"].lower() else x["timestamp"], + ), + axis=1, ) # print("hf_recent['label']=", hf_recent['label'].tolist()) - #print(~hf_recent['timestamp'].str.contains('unknown')) - d.at[0, 'permissions'] = hf_recent['label'].tolist() - non_hf_recent.drop('appId', axis=1, inplace=True) - d.at[0, 'non_hf_permissions_html'] = non_hf_recent.to_html() + # print(~hf_recent['timestamp'].str.contains('unknown')) + d.at[0, "permissions"] = hf_recent["label"].tolist() + non_hf_recent.drop("appId", axis=1, inplace=True) + d.at[0, "non_hf_permissions_html"] = non_hf_recent.to_html() print("App info dict:", d) - #hf_recent['label'] = hf_recent['label'].map(str) + " (last used by app: "+\ + # hf_recent['label'] = hf_recent['label'].map(str) + " (last used by app: "+\ # (hf_recent['timestamp'].map(str) if isinstance(hf_recent['timestamp'], datetime) else 'nooo') +")" - #d['recent_permissions'] = hf_recent['timestamp'] - #print(d['recent_permissions']) + # d['recent_permissions'] = hf_recent['timestamp'] + # print(d['recent_permissions']) return d, info def isrooted(self, serial): - ''' - Doesn't return all reasons by default. First match will return. - TODO: make consistent with iOS isrooted, which returns all reasons discovered. - ''' + """ + Doesn't return all reasons by default. First match will return. + TODO: make consistent with iOS isrooted, which returns all reasons discovered. + """ + root_checks = [] + + # SU binary check cmd = "{cli} -s {serial} shell 'command -v su'" s = catch_err(run_command(cmd, serial=shlex.quote(serial))) - if not s or s == -1 or 'not found' in s or len(s) == 0 or (s == "[android]: Error running ''. Error (1):"): + if ( + not s + or s == -1 + or "not found" in s + or len(s) == 0 + or (s == "[android]: Error running ''. Error (1):") + ): print(config.error()) reason = "couldn't find 'su' tool on the phone." - return (False, reason) + c1 = (False, reason) + root_checks.append(c1) + else: + reason = "found '{}' tool on the phone. Verify whether this is a su binary.".format( + s.strip() + ) + c1 = (True, reason) + root_checks.append(c1) + + # OEM Unlock check + cmd2 = "{cli} -s {serial} shell getprop ro.boot.flash.locked" + s2 = catch_err(run_command(cmd2, serial=shlex.quote(serial))) + if s2.strip() == "0": + reason = "Bootloader unlocked." + c2 = (True, reason) + root_checks.append(c2) else: - reason = "found '{}' tool on the phone. Verify whether this is a su binary.".format(s.strip()) - return (True, reason) - + reason = "Bootloader is locked" + c2 = (False, reason) + root_checks.append(c2) + + # Frida check + cmd3 = "{cli} -s {serial} shell ps -A" + s3 = catch_err(run_command(cmd3, serial=shlex.quote(serial)), large_output=True) + found = False + for line in s3.split("\n"): + if "frida" in line: + reason = "Frida server found." + c3 = (True, reason) + root_checks.append(c3) + found = True + if not found: + reason = "Frida server NOT found." + c3 = (False, reason) + root_checks.append(c3) + installed_apps = self.installed_apps if not installed_apps: installed_apps = self.get_apps(serial) - + # FIXME: load these from a private database instead. from OWASP, # https://sushi2k.gitbooks.io/the-owasp-mobile-security-testing-guide/content/0x05j-Testing-Resiliency-Against-Reverse-Engineering.html - root_pkgs = ['com.noshufou.android.su','com.thirdparty.superuser',\ - 'eu.chainfire.supersu', 'com.koushikdutta.superuser',\ - 'com.zachspong.temprootremovejb' ,'com.ramdroid.appquarantine'] + root_pkgs = [ + "com.noshufou.android.su", + "com.thirdparty.superuser", + "eu.chainfire.supersu", + "com.koushikdutta.superuser", + "com.zachspong.temprootremovejb", + "com.ramdroid.appquarantine", + ] root_pkgs_check = list(set(root_pkgs) & set(installed_apps)) if root_pkgs_check: - reason = "found the following app(s) on the phone: '{}'."\ - .format(str(root_pkgs_check)) - return (True, reason) - + reason = "found the following app(s) on the phone: '{}'.".format( + str(root_pkgs_check) + ) + root_checks.append((True, reason)) + + tmp_boolean = False + tmp_reason = "" + for b, reason in root_checks: + if b: + print(reason) + tmp_boolean = True + tmp_reason += f"{reason}\n" + + if tmp_boolean: + return (True, tmp_reason) + else: + return (False, "Checks pass") + class IosScan(AppScan): """ Run `bash scripts/setup.sh to get libimobiledevice dependencies` """ + def __init__(self): - super(IosScan, self).__init__('ios', cli=config.LIBIMOBILEDEVICE_PATH) + super(IosScan, self).__init__("ios", cli=config.LIBIMOBILEDEVICE_PATH) self.installed_apps = None self.serialno = None self.parse_dump = None def setup(self, attempt_remount=False): - ''' FIXME: iOS setup. ''' - if config.PLATFORM == 'linux' and attempt_remount: + """FIXME: iOS setup.""" + if config.PLATFORM == "linux" and attempt_remount: # should show GUI prompt for password. sudo apt install policykit-1 if not there. - cmd = "pkexec '"+config.SCRIPT_DIR + "/ios_mount_linux.sh' mount" - #mountmsg = run_command(cmd).stderr.read().decode('utf-8') + cmd = "pkexec '" + config.SCRIPT_DIR + "/ios_mount_linux.sh' mount" + # mountmsg = run_command(cmd).stderr.read().decode('utf-8') if catch_err(run_command(cmd)) == -1: - return (False, "Couldn't detect device. See {}/ios_mount_linux.sh."\ - .format(config.SCRIPT_DIR)) - cmd = '{}idevicepair pair'.format(self.cli) - pairmsg = run_command(cmd).stdout.read().decode('utf-8') + return ( + False, + "Couldn't detect device. See {}/ios_mount_linux.sh.".format( + config.SCRIPT_DIR + ), + ) + cmd = "{}idevicepair pair".format(self.cli) + pairmsg = run_command(cmd).stdout.read().decode("utf-8") if "No device found, is it plugged in?" in pairmsg: return (False, pairmsg) elif "Please enter the passcode on the device and retry." in pairmsg: - return (False, "Please unlock your device and follow the trust dialog"\ - " (you will need to enter your passcode). Then try to scan again.") + return ( + False, + "Please unlock your device and follow the trust dialog" + " (you will need to enter your passcode). Then try to scan again.", + ) elif "SUCCESS: Paired with device" in pairmsg: return (True, "Device successfully paired. Setup complete.") elif "said that the user denied the trust dialog." in pairmsg: - return (False, "The trust dialog was denied. Please unplug the device"\ - ", reconnect it, and scan again -- accept the trust dialog to proceed.") + return ( + False, + "The trust dialog was denied. Please unplug the device" + ", reconnect it, and scan again -- accept the trust dialog to proceed.", + ) return (True, "Follow trust dialog on iOS device to continue.") # TODO: This might send titles out of order. Fix this to send both appid and @@ -461,7 +571,7 @@ def get_apps(self, serialno: str, from_dump: bool) -> list: return [] self._load_dump(serialno) self.installed_apps = self.parse_dump.installed_apps() - print('iOS INFO DUMPED.') + print("iOS INFO DUMPED.") return self.installed_apps def get_system_apps(self, serialno:str, from_dump: bool) -> list: @@ -473,14 +583,17 @@ def get_system_apps(self, serialno:str, from_dump: bool) -> list: def devices(self): def _is_device(x): """Is it looks like a serial number""" - return re.match(r'[a-f0-9]+', x) is not None + return re.match(r"[a-f0-9]+", x) is not None - #cmd = '{cli} --detect -t1 | tail -n 1' - cmd = '{}idevice_id -l | tail -n 1'.format(self.cli) + # cmd = '{cli} --detect -t1 | tail -n 1' + cmd = "{}idevice_id -l | tail -n 1".format(self.cli) self.serialno = None s = catch_err(run_command(cmd), cmd=cmd, msg="") - d = [l.strip() for l in s.split('\n') - if l.strip() and _is_device(l.strip())] + d = [ + line.strip() + for line in s.split("\n") + if line.strip() and _is_device(line.strip()) + ] print("Devices found:", d) return d @@ -494,120 +607,171 @@ def device_info(self, serial): return ("", {}) def _load_dump(self, serial) -> parse_dump.IosDump: - hmac_serial = config.hmac_serial(serial) - path = self.dump_path(serial, fkind='Dir') + # hmac_serial = config.hmac_serial(serial) + path = self.dump_path(serial, fkind="Dir") # dumped = catch_err(run_command(cmd)).strip() - dumpf = os.path.join(path, config.IOS_DUMPFILES['Apps']) - dumpfinfo = os.path.join(path, config.IOS_DUMPFILES['Info']) + dumpf = os.path.join(path, config.IOS_DUMPFILES["Apps"]) + dumpfinfo = os.path.join(path, config.IOS_DUMPFILES["Info"]) self.parse_dump = parse_dump.IosDump(dumpf, finfo=dumpfinfo) return self.parse_dump def _dump_phone(self, serial: str) -> bool: - print('DUMPING iOS INFO...') + print("DUMPING iOS INFO...") connected, connected_reason = self.setup() if not connected: print("Couldn't connect to the device. Trying to reconnect. Over here.") print(connected_reason) return False hmac_serial = config.hmac_serial(serial) - cmd = "'{}/ios_dump.sh' {} {Apps} {Info} {Jailbroken-FS} {Jailbroken-SSH}"\ - .format(config.SCRIPT_DIR, hmac_serial, **config.IOS_DUMPFILES) + cmd = ( + "'{}/ios_dump.sh' {} {Apps} {Info} {Jailbroken-FS} {Jailbroken-SSH}".format( + config.SCRIPT_DIR, hmac_serial, **config.IOS_DUMPFILES + ) + ) print(cmd) dumped = catch_err(run_command(cmd), cmd).strip() if dumped: - print('iOS DUMP RESULTS for {}:'.format(hmac_serial)) + print("iOS DUMP RESULTS for {}:".format(hmac_serial)) print(dumped) return True else: - print(">> The iOS dumping failed for some reason. Check above for more information") + print( + ">> The iOS dumping failed for some reason. Check above for more information" + ) return False def uninstall(self, serial, appid): - #cmd = '{cli} -i {serial} --uninstall_only --bundle_id {appid!r}' - #cmd = 'ideviceinstaller --udid {} --uninstall {appid!r}'.format(serial, appid) - cmd = f'{self.cli}ideviceinstaller --uninstall {appid!r}' - s = catch_err(run_command(cmd, appid=appid), - cmd=cmd, msg="Could not uninstall") + # cmd = '{cli} -i {serial} --uninstall_only --bundle_id {appid!r}' + # cmd = 'ideviceinstaller --udid {} --uninstall {appid!r}'.format(serial, appid) + cmd = f"{self.cli}ideviceinstaller --uninstall {appid!r}" + s = catch_err(run_command(cmd, appid=appid), cmd=cmd, msg="Could not uninstall") return s != -1 def isrooted(self, serial): # dict with 'True' and 'False' mapping to a list of reasons for root/no root rooted = defaultdict(list) # TODO This should be removed once the check is fixed - rooted['False'].append("Jailbreak and root checks are currently " - "disabled") - return (False, rooted['False']) + rooted["False"].append("Jailbreak and root checks are currently " "disabled") + return (False, rooted["False"]) try: - with open(self.dump_path(serial, 'Jailbroken-FS'),'r') as fh: + with open(self.dump_path(serial, "Jailbroken-FS"), "r") as fh: JAILBROKEN_LOG = fh.readlines() - if "Your device needs to be jailbroken and have the AFC2 service installed.\n" in JAILBROKEN_LOG: - rooted['False'].append("Filesystem is not rooted. *Highly unlikely* to be jailbroken.") - elif 'No such file or directory' in JAILBROKEN_LOG: - rooted['False'].append("Unable to check device.") + if ( + "Your device needs to be jailbroken and have the AFC2 service installed.\n" + in JAILBROKEN_LOG + ): + rooted["False"].append( + "Filesystem is not rooted. *Highly unlikely* to be jailbroken." + ) + elif "No such file or directory" in JAILBROKEN_LOG: + rooted["False"].append("Unable to check device.") else: - rooted['True'].append("Filesystem *might* be rooted. Conduct additional checks.") - except FileNotFoundError as e: + rooted["True"].append( + "Filesystem *might* be rooted. Conduct additional checks." + ) + except FileNotFoundError: print("Couldn't find Jailbroken FS check log.") - # TODO: trigger error message? like - # TODO: show a try again, maybe it's not plugged in properly. still not working? this could be due to many many many reasons. - #return (True, ['FS check failed, jailbreak not necessarily occurring.']) + # TODO: trigger error message? like + # TODO: show a try again, maybe it's not plugged in properly. still not working? + # this could be due to many many many reasons. + # return (True, ['FS check failed, jailbreak not necessarily occurring.']) try: - with open(self.dump_path(serial, 'Jailbroken-SSH'),'r') as fh: + with open(self.dump_path(serial, "Jailbroken-SSH"), "r") as fh: JAILBROKEN_SSH_LOG = fh.readlines() if "0\n" in JAILBROKEN_SSH_LOG: - rooted['True'].append("SSH is enabled.") - except FileNotFoundError as e: - # TODO: trigger error message? like - # TODO: show a try again, maybe it's not plugged in properly. still not working? this could be due to many many many reasons. + rooted["True"].append("SSH is enabled.") + except FileNotFoundError: + # TODO: trigger error message? like + # TODO: show a try again, maybe it's not plugged in properly. still not working? + # this could be due to many many many reasons. print("Couldn't find Jailbroken SSH check log.") # if app["Path"].split("/")[-1] in ["Cydia.app"] - ''' Summary of jailbroken detection: checks for commonly installed jailbreak apps, + """ Summary of jailbroken detection: checks for commonly installed jailbreak apps, tries to mount root filesystem (AFC2, by default on iOS 7 and lower, tries to SSH into the phone (FIXME). iproxy 2222 22 `idevice_id -l` says "waiting for connection" perpertually if not work. says "accepted connection" on next line if it does. https://twitter.com/bellis1000/status/807527492810665984?lang=en # add to jailbroken log # FIXME: load from private data blocklist. More to be added. - ''' - # FIXME: NEED to apply first to df. self.installed_apps not sufficient. dotapps.append(app["Path"].split("/")[-1]) + """ + # FIXME: NEED to apply first to df. self.installed_apps not sufficient. + # dotapps.append(app["Path"].split("/")[-1]) - apps_titles = self.parse_dump.installed_apps_titles()['title'].tolist() + apps_titles = self.parse_dump.installed_apps_titles()["title"].tolist() # TODO: convert to set check - for app in ["Cydia", "blackra1n", "Undecimus", - "FakeCarrier", "Icy", "IntelliScreen", - "MxTube", "RockApp", "SBSettings", - "WinterBoard", "3uTools", "Absinthe", - "backr00m", "blackra1n", "Corona", - "doubleH3lix", "Electra", "EtasonJB", - "evasi0n", "evasi0n7", "G0blin", "Geeksn0w", - "greenpois0n", "h3lix", "Home Depot", "ipwndfu", - "JailbreakMe", "LiberiOS", "LiberTV", "limera1n", - "Meridian", "p0sixspwn", "Pangu", "Pangu8", "Pangu9", - "Phœnix", "PPJailbreak", "purplera1n", "PwnageTool", - "redsn0w", "RockyRacoon","Rocky Racoon", "Saïgon", "Seas0nPass", - "sn0wbreeze", "Spirit", "TaiG", "unthredera1n", "yalu"]: + for app in [ + "Cydia", + "blackra1n", + "Undecimus", + "FakeCarrier", + "Icy", + "IntelliScreen", + "MxTube", + "RockApp", + "SBSettings", + "WinterBoard", + "3uTools", + "Absinthe", + "backr00m", + "blackra1n", + "Corona", + "doubleH3lix", + "Electra", + "EtasonJB", + "evasi0n", + "evasi0n7", + "G0blin", + "Geeksn0w", + "greenpois0n", + "h3lix", + "Home Depot", + "ipwndfu", + "JailbreakMe", + "LiberiOS", + "LiberTV", + "limera1n", + "Meridian", + "p0sixspwn", + "Pangu", + "Pangu8", + "Pangu9", + "Phœnix", + "PPJailbreak", + "purplera1n", + "PwnageTool", + "redsn0w", + "RockyRacoon", + "Rocky Racoon", + "Saïgon", + "Seas0nPass", + "sn0wbreeze", + "Spirit", + "TaiG", + "unthredera1n", + "yalu", + ]: if app in apps_titles: - rooted['True'].append("{} was found on the device.".format(app)) + rooted["True"].append("{} was found on the device.".format(app)) # if apps check passes if not rooted: - rooted['False'].append("Did not find popular jailbreak apps installed.") - ''' check for jailbroken status after attempts logged by ios_dump.sh ''' - if 'True' in rooted: - return (True, rooted['True']) + rooted["False"].append("Did not find popular jailbreak apps installed.") + """ check for jailbroken status after attempts logged by ios_dump.sh """ + if "True" in rooted: + return (True, rooted["True"]) else: - return (False, rooted['False']) + return (False, rooted["False"]) class TestScan(AppScan): def __init__(self): - super(TestScan, self).__init__('android', cli='cli') + super(TestScan, self).__init__("android", cli="cli") def get_apps(self, serialno): # assert serialno == 'testdevice1' - installed_apps = open(config.TEST_APP_LIST, 'r').read().splitlines() + installed_apps = open(config.TEST_APP_LIST, "r").read().splitlines() return installed_apps def devices(self): @@ -621,5 +785,3 @@ def get_offstore_apps(self, serialno, from_dump=False): def uninstall(self, serial, appid): return True - - diff --git a/runcmd.py b/runcmd.py index d8c3b56..03665b1 100644 --- a/runcmd.py +++ b/runcmd.py @@ -1,9 +1,10 @@ -#import config +# import config import re + # import shlex import subprocess -''' +""" def add_to_error(*args): global ERROR_LOG m = '\n'.join(str(e) for e in args) @@ -18,38 +19,65 @@ def error(): print("ERROR: {}".format(e)) return e.replace("\n", "
") -''' +""" + # TODO: @sam the catch_err should only catch the os level errors, not # application level errors. They should go to particular application specific # handling. -def catch_err(p, cmd='', msg='', time=10) -> str: +def catch_err( + p: subprocess.Popen[bytes], cmd="", msg="", time=10, large_output=False +) -> str: """TODO: Therer are two different types. homogenize them""" try: + large_output_var = b"" + if large_output: + if p.stdout: + for line in p.stdout: + large_output_var += line + p.wait(time) print("Returncode: ", p.returncode) if p.returncode != 0: - err_msg = p.stderr.read().decode('utf-8') - m = ("[{}]: Error running {!r}. Error ({}): {}\n{}".format( - 'android', cmd, p.returncode, err_msg, msg - )) + + if p.stderr: + err_msg = p.stderr.read().decode("utf-8") + else: + err_msg = ( + "stderr was none. This may indicate large issues with process." + ) + + m = "[{}]: Error running {!r}. Error ({}): {}\n{}".format( + "android", cmd, p.returncode, err_msg, msg + ) print(cmd, p.returncode, err_msg, msg) - if 'insufficient permissions for device: user in plugdev group' in err_msg: + if "insufficient permissions for device: user in plugdev group" in err_msg: e = 'Error: Please set "USB For File Transfers" mode on your Android device.' print(e) return "" # config.add_to_error(m) return m else: - s = p.stdout.read().decode() - if (len(s) <= 100 and re.search('(?i)(fail|error)', s)) or \ - 'insufficient permissions for device: user in plugdev group; are your udev rules wrong?'\ - in s: + if large_output: + s = large_output_var.decode() + else: + if p.stdout: + s = p.stdout.read().decode() + else: + return "" + + if ( + (len(s) <= 100 and re.search("(?i)(fail|error)", s)) + or "insufficient permissions for device: user in plugdev group; are your udev rules wrong?" + in s + ): # config.add_to_error(s) return "" - if 'insufficient permissions for device: user in plugdev group; are your udev rules wrong?'\ - in s: - print('Need USB for Charging.') + if ( + "insufficient permissions for device: user in plugdev group; are your udev rules wrong?" + in s + ): + print("Need USB for Charging.") return "" else: print(s) @@ -61,19 +89,15 @@ def catch_err(p, cmd='', msg='', time=10) -> str: def run_command(cmd, **kwargs): - _cmd = cmd.format( - cli='adb', **kwargs - ) + _cmd = cmd.format(cli="adb", **kwargs) print(_cmd) - if kwargs.get('nowait', False) or kwargs.get('NOWAIT', False): + if kwargs.get("nowait", False) or kwargs.get("NOWAIT", False): pid = subprocess.Popen( - _cmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True + _cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True ).pid return pid else: p = subprocess.Popen( - _cmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True + _cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True ) return p