-
Notifications
You must be signed in to change notification settings - Fork 85
/
Copy pathadb.py
287 lines (226 loc) · 10.9 KB
/
adb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import logging
import re
import shutil
import subprocess
from pathlib import Path
import frida
import requests
from frida.core import Device
# Suppress urllib3 warnings
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
def shell(prompt: list) -> subprocess.CompletedProcess:
"""
Executes a shell command and returns the result.
Parameters:
prompt (list): The command to execute as a list of strings.
Returns:
subprocess.CompletedProcess: The result containing return code, stdout, and stderr.
"""
prompt = list(map(str, prompt)) # Ensure all command parts are strings
# logging.getLogger("Shell").debug(" ".join(prompt))
return subprocess.run(prompt, capture_output=True) # Run the command and capture output
class ADB:
"""
Class for managing interactions with the Android device via ADB.
"""
def __init__(self, device: str = None, timeout: int = 5):
"""
Initializes ADB connection to the device.
Parameters:
device (str, optional): Device ID to connect to, defaults to the first USB device.
timeout (int, optional): Timeout for connection in seconds. Defaults to 5.
Raises:
EnvironmentError: If ADB is not found in the system path.
Exception: If connection to the device fails.
"""
self.logger = logging.getLogger(self.__class__.__name__)
# Ensure ADB is available
if not shutil.which("adb"):
raise EnvironmentError(
"ADB is not recognized as an environment variable. "
"Ensure ADB is installed and refer to the documentation: "
"https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge"
)
# Start the ADB server if not already running
sp = shell(['adb', 'start-server'])
if sp.returncode != 0:
self.logger.warning("ADB server startup failed (Error: %s)", sp.stdout.decode("utf-8").strip())
# Connect to device (or default to the first USB device)
try:
self.device: Device = frida.get_device(id=device, timeout=timeout) if device else frida.get_usb_device(timeout=timeout)
self.logger.info("Connected to device: %s (%s)", self.device.name, self.device.id)
except Exception as e:
self.logger.error("Failed to connect to device: %s", e)
raise e
self.prompt = ['adb', '-s', self.device.id, 'shell']
# Retrieve and log device properties
properties = self.device_properties()
if properties:
self.logger.info("SDK API: %s", properties.get("ro.build.version.sdk", "Unknown"))
self.logger.info("ABI CPU: %s", properties.get("ro.product.cpu.abi", "Unknown"))
else:
self.logger.warning("No device properties retrieved")
def device_properties(self) -> dict:
"""
Retrieves system properties from the device.
Returns:
dict: A dictionary mapping property keys to their corresponding values.
"""
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {}
# Execute the shell command to retrieve device properties
sp = shell([*self.prompt, 'getprop'])
if sp.returncode != 0:
self.logger.error("Failed to retrieve device properties (Error: %s)", sp.stdout.decode("utf-8").strip())
return properties
# Parse the output and cast values accordingly
for line in sp.stdout.decode("utf-8").splitlines():
match = re.match(r"\[(.*?)\]: \[(.*?)\]", line)
if match:
key, value = match.groups()
# Cast numeric and boolean values where appropriate
if value.isdigit():
value = int(value)
elif value.lower() in ("true", "false"):
value = value.lower() == "true"
properties[key] = value
return properties
def list_applications(self, user: bool = True, system: bool = False) -> dict:
"""
Lists installed applications on the device, with optional filters for user/system apps.
Parameters:
user (bool, optional): Include user-installed apps. Defaults to True.
system (bool, optional): Include system apps. Defaults to False.
Returns:
dict: A dictionary of application packages and their file paths.
"""
applications = {}
# Validate input; return empty dict if no filter is set
if not user and not system:
return applications
# Set the appropriate shell command based on user/system filters
prompt = [*self.prompt, 'pm', 'list', 'packages', '-f']
if user and not system:
prompt.append("-3")
elif not user and system:
prompt.append("-s")
# Execute the shell command to list applications
sp = shell(prompt)
if sp.returncode != 0:
self.logger.error("Failed to retrieve app list (Error: %s)", sp.stdout.decode("utf-8").strip())
return applications
# Parse and add applications to the dictionary
for line in sp.stdout.decode("utf-8").splitlines():
try:
path, package = line.strip().split(":", 1)[1].rsplit("=", 1)
applications[package] = path
except Exception as e:
pass
return applications
def start_application(self, package: str) -> bool:
"""
Starts an application by its package name.
Parameters:
package (str): The package name of the application.
Returns:
bool: True if the app was started successfully, False otherwise.
"""
# Get package information using dumpsys
sp = shell([*self.prompt, 'dumpsys', 'package', package])
lines = sp.stdout.decode("utf-8").splitlines()
# Remove empty lines to ensure backwards compatibility
lines = [l.strip() for l in lines if l.strip()]
# Look for MAIN activity to identify entry point
for i, line in enumerate(lines):
if "android.intent.action.MAIN" in line:
match = re.search(fr"({package}/[^ ]+)", lines[i + 1])
if match:
# Start the application by its main activity
main_activity = match.group()
sp = shell([*self.prompt, 'am', 'start', '-n', main_activity])
if sp.returncode == 0:
return True
self.logger.error("Failed to start app %s (Error: %s)", package, sp.stdout.decode("utf-8").strip())
break
self.logger.error("Package %s not found or no MAIN intent", package)
return False
def enumerate_processes(self) -> dict:
"""
Lists running processes and maps process names to their PIDs.
Returns:
dict: Dictionary of process names and corresponding PIDs.
"""
# https://github.com/frida/frida/issues/1225#issuecomment-604181822
processes = {}
# Attempt to get the list of processes using the 'ps -A' command
prompt = [*self.prompt, 'ps']
sp = shell([*prompt, '-A'])
lines = sp.stdout.decode("utf-8").splitlines()
# If the output has less than 10 lines, retry with a simpler 'ps' command
if len(lines) < 10:
sp = shell(prompt)
if sp.returncode != 0:
self.logger.error("Failed to execute ps command (Error: %s)", sp.stdout.decode("utf-8").strip())
return processes
lines = sp.stdout.decode("utf-8").splitlines()
# Iterate through lines starting from the second line (skipping header)
for line in lines[1:]:
try:
parts = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
pid = int(parts[1]) # Extract PID
name = " ".join(parts[8:]).strip() # Extract process name
# Handle cases where process name might be in brackets (e.g., kernel threads)
name = name if name.startswith("[") else Path(name).name
processes[name] = pid
except Exception as e:
pass
return processes
def install_application(self, path: Path = None, url: str = None) -> bool:
"""
Installs an application on the device either from a local file or by downloading from a URL.
Parameters:
path (Path, optional): The local file path of the APK to install. Defaults to None.
url (str, optional): The URL to download the APK from. Defaults to None.
Returns:
bool: True if the installation was successful, False otherwise.
"""
# Prepare the shell command for installation
prompt = [*self.prompt[:-1], 'install']
# Install from a local file path if a valid path is provided
if path and path.is_file():
sp = shell([*prompt, path]) # Run the installation command with the local file path
if sp.returncode == 0:
return True
self.logger.error("Installation failed for local path: %s (Error: %s)", path, sp.stdout.decode("utf-8").strip())
# If URL is provided, attempt to download the APK and install it
status = False
if url:
file = Path("tmp.apk") # Temporary file to store the downloaded APK
try:
# Download the APK from the provided URL
r = requests.get(url, headers={"Accept": "*/*", "User-Agent": "KeyDive/ADB"})
r.raise_for_status()
# Save the downloaded APK to a temporary file
file.write_bytes(r.content)
# Attempt installation from the downloaded APK
status = self.install_application(path=file)
except Exception as e:
self.logger.error("Failed to download application from URL: %s (Error: %s)", url, e)
file.unlink(missing_ok=True) # Clean up the temporary file, even if there was an error
return status
def open_url(self, url: str) -> bool:
"""
Opens a specified URL on the device.
Parameters:
url (str): The URL to be opened on the device.
Returns:
bool: True if the URL was successfully opened, False otherwise.
"""
# Execute the shell command to open the URL using the Android 'am' (Activity Manager) command.
sp = shell([*self.prompt, 'am', 'start', '-a', 'android.intent.action.VIEW', '-d', url])
# Check the result of the command execution and log if there is an error
if sp.returncode != 0:
self.logger.error("URL open failed for: %s (Return: %s)", url, sp.stdout.decode("utf-8").strip())
return False
return True
__all__ = ("ADB",)