1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
2# SPDX-FileContributor: u039b <git@0x39b.fr>
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5
6import logging
7import time
8from functools import cached_property
9from importlib import resources
10from pathlib import Path
11from tempfile import NamedTemporaryFile
12from typing import Dict, Any, Optional
13
14import frida
15from frida.core import Device
16from ppadb.client import Client as AdbClient
17from ppadb.device import Device as PPDevice
18
19from octopus.frida.server import FridaServer
20
21logger = logging.getLogger(__name__)
22
23
[docs]
24class AndroidDevice:
25 """
26 Represents an Android device and provides methods for device management.
27
28 This class encapsulates operations such as rooting, property retrieval,
29 and Frida server management for an Android device.
30
31 Attributes:
32 adb: Instance of :class:`~octopus.android.adb.ADB` used for device communication.
33 is_root: Indicates if the device is running as root.
34 requires_su: Indicates if :textmono:`su` access is required for root operations.
35 rooted: Indicates if the device is rooted or has `su` access.
36 adb_device: The connected ADB device instance.
37 """
38
39 # device_tmp_dir = Path("/sdcard/")
40 device_tmp_dir = Path("/data/local/tmp/")
41
[docs]
42 def __init__(self, adb_device: PPDevice):
43 """
44 Initializes the AndroidDevice instance.
45
46 Connects to the device using the provided ADB instance, attempts to
47 root the device, checks root status, and verifies Frida server installation.
48
49 Args:
50 adb: An instance of :class:`~octopus.android.adb.ADB` for device communication.
51 """
52 self.adb_device = adb_device
53 self.is_root = False
54 self.requires_su = False
55 self.rooted = False
56 self.tcpdump_binaries_dir = resources.files("octopus") / "assets" / "tcpdump_binaries"
57 self.tcpdump_path = self.device_tmp_dir / "tcpdump"
58 self.root()
59 self.is_rooted()
60 self.check_frida_server_installed()
61
[docs]
62 def root(self):
63 """
64 Attempts to root the Android device using ADB.
65
66 Raises:
67 RuntimeError: If root access is disabled on the device.
68
69 Uses the internal ADB service to request root access. If the device is not already running as root,
70 attempts to reconnect.
71 """
72 logger.info("Rooting the device")
73 try:
74 self.adb_device.root()
75 except RuntimeError as e:
76 if "adbd is already running as root" in str(e):
77 pass
78 else:
79 raise e
80
[docs]
81 def get_device_properties(self) -> Dict[str, str]:
82 """
83 Retrieves key properties and identifiers from the Android device.
84
85 Returns:
86 A dictionary containing device properties such as
87 fingerprint, brand, device, manufacturer, model, name, serial number,
88 Android version, API level, and IMEI.
89
90 Notes:
91 Uses the :meth:`get_property` method to fetch system properties.
92 IMEI is retrieved using a shell command that parses the output of
93 :textmono:`service call iphonesubinfo`.
94 Handles missing or empty property values gracefully.
95 """
96 props = [
97 ("fingerprint", "ro.vendor.build.fingerprint"),
98 ("brand", "ro.product.brand"),
99 ("device", "ro.product.device"),
100 ("manufacturer", "ro.product.manufacturer"),
101 ("model", "ro.product.model"),
102 ("name", "ro.product.name"),
103 ("serialno", "ro.serialno"),
104 ("android_version", "ro.build.version.release"),
105 ("api_level", "ro.build.version.sdk"),
106 ]
107 device_properties = {}
108 for name, key in props:
109 device_properties[name] = self.get_property(key).strip()
110 # Get IMEI
111 imei = self.adb_shell(
112 """service call iphonesubinfo 1|awk -F "'" '{print $2}'|sed '1 d'|tr -d '.'|awk '{print}' ORS="""
113 )
114 device_properties["imei"] = imei.strip()
115 return device_properties
116
117 def _get_system_properties(self) -> Dict[str, Any]:
118 return self.get_frida_device().query_system_parameters()
119
[docs]
120 @cached_property
121 def system_properties(self) -> Dict[str, Any]:
122 return self._get_system_properties()
123
[docs]
124 @cached_property
125 def architecture(self):
126 return self._get_architecture()
127
128 def _get_architecture(self) -> str:
129 """
130 Returns the device CPU architecture.
131
132 Returns:
133 The CPU architecture string, such as 'arm64', 'x86_64', 'arm', or 'x86'.
134 Raises a RuntimeError if the architecture cannot be determined.
135 """
136 arch = self.system_properties["arch"]
137 if arch == "arm64":
138 return "arm64"
139 elif arch == "arm":
140 return "arm32"
141 elif arch == "ia32":
142 return "x86"
143 elif arch == "x64":
144 return "x86_64"
145 elif "x86" in arch:
146 return "x86"
147 else:
148 raise RuntimeError(f"Unknown architecture: {arch}")
149
[docs]
150 def get_tcpdump_version(self):
151 return f"tcpdump_{self.architecture}_android"
152
[docs]
153 def is_rooted(self) -> bool:
154 """
155 Checks if the device is rooted.
156
157 Returns:
158 True if the device is rooted or has :textmono:`su` access, False otherwise.
159 """
160 # Check user is root
161 self.is_root = "root" in self.adb_device.shell("whoami")
162 # Check su
163 if not self.is_root:
164 self.requires_su = "inaccessible or not found" not in self.adb_device.shell('su -c "echo 1"')
165 self.rooted = self.is_root or self.requires_su
166 return self.rooted
167
[docs]
168 def adb_shell(self, command) -> str:
169 """
170 Executes a shell command on the device via ADB.
171
172 Args:
173 command: The shell command to execute.
174
175 Returns:
176 The output of the shell command as a string.
177
178 Raises:
179 Exception: If the command execution fails.
180
181 Uses :textmono:`su` if root access is required and sets :textmono:`timeout=30`.
182 """
183 if self.requires_su:
184 command = f'su -c "{command}"'
185 return self.adb_device.shell(command, timeout=30)
186
[docs]
187 def adb_shell_nohup(self, command):
188 """
189 Executes a shell command on the device without waiting for output.
190
191 Args:
192 command: The shell command to execute.
193
194 Uses :textmono:`su` if root access is required. Opens the shell command with short
195 timeouts for non-blocking execution.
196 """
197
198 def dummy_handler(_):
199 pass
200
201 # The 'nohup' command ignores the hangup signal.
202 # Output is redirected to /dev/null to avoid leaving nohup.out files.
203 if self.requires_su:
204 shell_command = f'su -c "nohup {command} > /dev/null 2>&1"'
205 else:
206 shell_command = f"nohup {command} > /dev/null 2>&1"
207
208 try:
209 self.adb_device.shell(shell_command, handler=dummy_handler)
210 logger.info("Process started successfully")
211 except Exception as e:
212 logger.error(f"An error occurred while starting the process: {e}")
213
[docs]
214 def adb_push(self, local_path, device_path):
215 """
216 Pushes a file from the local system to the device.
217
218 Args:
219 local_path: Path to the local file.
220 device_path: Destination path on the device.
221
222 Raises:
223 Exception: If the push operation fails.
224
225 Uses the ADB push method to transfer files.
226 """
227 try:
228 self.adb_device.push(local_path, device_path)
229 except (Exception,) as e:
230 raise Exception(f"Failed to push {local_path} to {device_path}") from e
231
[docs]
232 def adb_pull(self, device_path, local_path):
233 """
234 Pulls a file from the device to the local system.
235
236 Args:
237 device_path: Path to the file on the device.
238 local_path: Destination path on the local system.
239
240 Raises:
241 Exception: If the pull operation fails.
242
243 Uses the ADB pull method to transfer files.
244 """
245 try:
246 self.adb_device.pull(str(device_path), str(local_path))
247 except (Exception,) as e:
248 raise Exception(f"Failed to pull {device_path} to {local_path}") from e
249
[docs]
250 def get_property(self, key: str) -> str:
251 """
252 Retrieves a system property from the device.
253
254 Args:
255 key: The property key to retrieve.
256
257 Returns:
258 The value of the system property as a string.
259
260 Uses the :textmono:`getprop` shell command.
261 """
262 value = self.adb_shell(f"getprop {key}") or ""
263 return value
264
[docs]
265 def check_frida_server_running(self) -> bool:
266 """
267 Checks if the Frida server process is running on the device.
268
269 Returns:
270 True if the Frida server is running, False otherwise.
271
272 This method uses the :textmono:`ps` command to search for the Frida server process.
273 """
274 value = self.adb_shell(f"ps -A | grep {FridaServer.executable}")
275 value = value.strip()
276 return bool(value)
277
[docs]
278 def check_frida_server_installed(self) -> bool:
279 """
280 Checks if the Frida server binary is installed on the device.
281
282 Returns:
283 True if the Frida server binary exists, False otherwise.
284
285 Uses the :textmono:`ls` command to verify the presence of the Frida server binary.
286 """
287 status = self.adb_shell(f"ls {FridaServer.executable_path}")
288 return "No such file or directory" not in status
289
[docs]
290 def get_frida_server_version(self) -> str:
291 """
292 Retrieves the version of the installed Frida server.
293
294 Returns:
295 The version string of the Frida server, or '0.0.0' if not found.
296
297 Executes the Frida server binary with the :textmono:`--version` flag.
298 """
299 output = self.adb_shell(f"{FridaServer.executable_path} --version").strip()
300 if not output or "inaccessible or not found" in output:
301 version = "unknown"
302 else:
303 version = output
304 return version
305
[docs]
306 def get_frida_device(self) -> Device:
307 raise NotImplementedError()
308
[docs]
309 def start_frida_server(self, force_stop: bool = True):
310 """
311 Starts the Frida server on the device if it is not already running.
312
313 Args:
314 force_stop: Forces stopping the Frida server before starting it.
315
316 If the server is already running, just logs an informational message.
317 Otherwise, starts the server in daemon mode.
318 """
319 if not self.check_frida_server_installed():
320 self.install_frida_server()
321 if self.get_frida_server_version() != frida.__version__:
322 self.install_frida_server()
323 if force_stop:
324 self.stop_frida_server()
325 if self.check_frida_server_running():
326 logger.info("Frida server is already running...")
327 else:
328 logger.info("Starting Frida server...")
329 self.adb_shell(f"{FridaServer.executable_path} -l 0.0.0.0 --daemonize")
330
[docs]
331 def stop_frida_server(self):
332 """
333 Stops the Frida server process on the device.
334
335 Uses the :textmono:`pkill` command to terminate the Frida server process.
336 """
337 logger.info("Stopping Frida server...")
338 self.adb_shell(f"pkill -f -l 9 {FridaServer.executable}")
339
[docs]
340 def install_frida_server(self, version: Optional[str] = None):
341 """Installs the Frida server binary on the device.
342
343 Downloads the specified version of the Frida server binary,
344 pushes it to the device, and sets the executable permission.
345 Uses a temporary file for the download.
346
347 Args:
348 version: The version of the Frida server to install.
349 Defaults to the currently installed ``frida`` Python
350 package version.
351 """
352 target_version = version or frida.__version__
353 logger.info(f"Installing frida-server {target_version} on device ({FridaServer.executable})...")
354
355 # Stop any running Frida server instance before replacing the binary
356 self.stop_frida_server()
357
358 with NamedTemporaryFile(mode="wb") as frida_server:
359 # Download the appropriate binary for this device's architecture
360 FridaServer.download_frida_server(
361 self.architecture,
362 frida_server.name,
363 "android",
364 target_version,
365 )
366 frida_server.seek(0)
367
368 # Push the binary to the device and make it executable
369 self.adb_push(frida_server.name, FridaServer.executable_path)
370 self.adb_shell(f"chmod +x {FridaServer.executable_path}")
371
372 logger.info(f"frida-server version {target_version} successfully installed.")
373
[docs]
374 def install_tcpdump(self):
375 logger.info(f"Installing tcpdump on device {self.tcpdump_path}...")
376 tcpdump_version = self.get_tcpdump_version()
377 tcpdump_binary = self.tcpdump_binaries_dir / tcpdump_version
378 self.adb_device.push(tcpdump_binary, str(self.tcpdump_path))
379 self.adb_shell(f"chmod +x {self.tcpdump_path}")
380
381
[docs]
382class AndroidDeviceUsb(AndroidDevice):
383 """
384 Android device connected via USB.
385
386 Inherits from :class:`~octopus.android.device.AndroidDevice` and
387 initializes the device using a default :class:`~octopus.android.adb.ADB`
388 instance for USB communication.
389 """
390
[docs]
391 def __init__(self, device_id: Optional[str] = None, adb_host="127.0.0.1", adb_port=5037):
392 """Instantiate an AndroidDeviceUsb instance.
393
394 Connects to an ADB client and selects the appropriate device.
395 If a ``device_id`` is provided, connects to that specific device.
396 If only one device is connected, selects it automatically.
397
398 Args:
399 device_id: Optional ADB device serial number. If omitted and
400 exactly one device is connected, it is selected
401 automatically. Raises :exc:`RuntimeError` if no device
402 can be determined.
403 adb_host: Optional ADB host address, defaults to 127.0.0.1.
404 adb_port: Optional ADB port number, defaults to 5037.
405
406 Raises:
407 RuntimeError: If no device is found and ``device_id`` is not
408 provided, or if more than one device is connected without
409 specifying a ``device_id``.
410 """
411 client = AdbClient(adb_host, adb_port)
412 client.devices()
413 if device_id:
414 device = client.device(device_id)
415 elif len(client.devices()) == 1:
416 device = client.devices()[0]
417 else:
418 raise RuntimeError("No device found.")
419 super().__init__(device)
420
[docs]
421 def get_frida_device(self) -> Device:
422 return frida.get_usb_device()
423
424
[docs]
425class AndroidDeviceTcp(AndroidDevice):
426 """
427 Android device connected via TCP/IP.
428
429 Inherits from :class:`~octopus.android.device.AndroidDevice` and
430 initializes the device using a :class:`~octopus.android.adb.ADB` instance
431 configured for TCP/IP communication.
432 """
433
[docs]
434 def __init__(self, host: str, port: int = 5555, adb_host="127.0.0.1", adb_port=5037):
435 """
436 Initializes an AndroidDeviceTcp instance.
437
438 Args:
439 host: The IP address or hostname of the device.
440 port: The TCP port for ADB connection, defaults to 5555.
441 adb_host: Optional ADB host address, defaults to 127.0.0.1.
442 adb_port: Optional ADB port number, defaults to 5037.
443 """
444 client = AdbClient(adb_host, adb_port)
445 client.remote_connect(host, port)
446 device = client.device(f"{host}:{port}")
447 super().__init__(device)
448 self.host = host
449 self.port = port
450
[docs]
451 def get_frida_device(self) -> Device:
452 return frida.get_device_manager().add_remote_device(self.host)