1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
2# SPDX-License-Identifier: MIT
3
4import logging
5from pathlib import Path
6from typing import ClassVar, Any
7from base64 import b64decode, b64encode
8
9from pcapng_utils.payload import Payload
10from .base import HarEnrichment
11from .types import FlowDirection
12from .utils import robust_b64decode
13
14logger = logging.getLogger("enrichment")
15
16
[docs]
17class ContentDecryption(HarEnrichment):
18 ID: ClassVar = "decryption"
19
20 MIN_LEN_ENCRYPTED_BYTES: int = (
21 8 # at least 16 bytes for AES encrypted data for instance
22 )
23 MIN_LEN_DECRYPTED_BYTES: int = 2 # e.g. '[]' or '{}'
24
25 MAX_SIZE_DIFF_FRACTION: float = (
26 0.5 # do NOT keep best match if abs. size difference is > 50% of original size
27 )
28
29 def __init__(self, har_data: dict, input_data_file: Path) -> None:
30 super().__init__(har_data, input_data_file)
31 self.cryptography_operations: list[dict] = self.input_data # type: ignore
32
33 @staticmethod
34 def _is_ignored_algorithm(algo: str) -> bool:
35 # Message digests, MAC and signatures (could lead to false positives)
36 # cf. https://developer.android.com/reference/java/security/MessageDigest
37 # cf. https://developer.android.com/reference/java/security/Signature
38 # cf. https://developer.android.com/reference/kotlin/javax/crypto/Mac
39 algo = algo.upper()
40 return (
41 algo.startswith("SHA")
42 or algo.startswith("MD5")
43 or algo.startswith("DSA")
44 or algo.startswith("ECDSA")
45 or algo.startswith("ED25519")
46 or algo.startswith("EDDSA")
47 or algo.startswith("NONEWITH")
48 or algo.startswith("HMAC")
49 or algo.startswith("PBEWITHHMAC")
50 or algo.startswith("AESCMAC")
51 )
52
53 @staticmethod
54 def _is_asymmetrical_encryption(algo: str) -> bool:
55 # cf. https://developer.android.com/reference/javax/crypto/Cipher
56 algo = algo.upper()
57 return algo.startswith("RSA")
58
59 def _find_decrypted_data(
60 self, encrypted_payload: bytes, encrypted_data_parameter_name: FlowDirection
61 ) -> dict:
62 """Find the decrypted data matching the given base64 encoded payload"""
63 # Fail fast
64 if encrypted_data_parameter_name not in {"in", "out"}:
65 raise ValueError(f"Invalid {encrypted_data_parameter_name=}")
66
67 # Ignore payload with less than 8 bytes to avoid false positives and collisions
68 len_encrypted_payload = len(encrypted_payload)
69 if len_encrypted_payload < self.MIN_LEN_ENCRYPTED_BYTES:
70 return {}
71
72 best_match: dict = {}
73 best_abs_size_diff: float | None = None # in number of bytes (absolute)
74 hex_encrypted_payload = encrypted_payload.hex()
75
76 # If it's a request, the encrypted data is the output ('out') of the cryptographic primitive,
77 # but in input ('in') for a response
78 decrypted_data_parameter_name = (
79 "out" if encrypted_data_parameter_name == "in" else "in"
80 )
81
82 for operation in self.cryptography_operations:
83 # Read the cryptographic operation data and try to match
84 # - algorithm is ignored (signature, digest, MAC)
85 # - unless data for operation is missing/empty/too tiny
86 op_algo = operation.get("alg", "")
87 if self._is_ignored_algorithm(op_algo):
88 continue
89
90 # <!> both encrypted and decrypted data encoded in hexadecimal from the cryptographic primitive data
91 op_data = operation.get("data", {})
92 op_hex_encrypted_data = op_data.get(encrypted_data_parameter_name, "")
93 op_hex_decrypted_data = op_data.get(decrypted_data_parameter_name, "")
94
95 len_op_encrypted_data = len(op_hex_encrypted_data) // 2
96 len_op_decrypted_data = len(op_hex_decrypted_data) // 2
97 if (
98 len_op_encrypted_data < self.MIN_LEN_ENCRYPTED_BYTES
99 or len_op_decrypted_data < self.MIN_LEN_DECRYPTED_BYTES
100 ):
101 continue
102
103 # Check if the encrypted data is in the payload or vice versa
104 if (
105 op_hex_encrypted_data in hex_encrypted_payload
106 or hex_encrypted_payload in op_hex_encrypted_data
107 ):
108 # Compute the size difference between the operation data and the actual payload
109 # (operation size is mean of encrypted and decrypted sizes by default, only encrypted size for asymmetric cipher)
110 # Minimum size difference is the best match
111 len_op = (
112 len_op_encrypted_data
113 if self._is_asymmetrical_encryption(op_algo)
114 else (len_op_encrypted_data + len_op_decrypted_data) / 2
115 )
116 abs_diff = abs(len_encrypted_payload - len_op)
117 if best_abs_size_diff is None or abs_diff < best_abs_size_diff:
118 best_abs_size_diff = abs_diff
119 best_match = {
120 "operation": operation,
121 "hex_decrypted_data": op_hex_decrypted_data,
122 "size_diff_encrypted": len_op_encrypted_data
123 - len_encrypted_payload,
124 "size_diff_decrypted": len_op_decrypted_data
125 - len_encrypted_payload,
126 }
127
128 # The best match was found, prepare the enrichment data
129 if not best_match:
130 return {}
131
132 assert best_abs_size_diff is not None
133 best_abs_size_diff_frac = best_abs_size_diff / len_encrypted_payload
134 logger.debug(
135 f"Decrypted content found: abs. size difference = {best_abs_size_diff_frac:.1%} of encrypted size"
136 )
137
138 if best_abs_size_diff_frac > self.MAX_SIZE_DIFF_FRACTION:
139 logger.debug("Ignoring decrypted content since abs. size diff is too big")
140 return {}
141
142 operation = best_match["operation"]
143 decrypted_bytes = bytes.fromhex(best_match["hex_decrypted_data"])
144
145 return {
146 "pid": operation.get("pid", ""),
147 "process": operation.get("process", ""),
148 "timestamp": operation.get("timestamp", 0.0) / 1000.0, # Convert to seconds
149 "primitiveParameters": {
150 "algorithm": operation["data"].get("alg", ""),
151 "key": operation["data"].get("key", ""),
152 "iv": operation["data"].get("iv", ""),
153 },
154 "originalBase64Content": b64encode(encrypted_payload).decode("ascii"),
155 "sizeDiffEncrypted": int(best_match["size_diff_encrypted"]),
156 "sizeDiffDecrypted": int(best_match["size_diff_decrypted"]),
157 # temp key for data exchange, not stored in HAR
158 "decryptedBytes": decrypted_bytes,
159 }
160
161 @staticmethod
162 def _get_bytes_possibly_from_base64(content: dict[str, str]) -> bytes | None:
163 if content.get("encoding") == "base64":
164 return b64decode(
165 content["text"], validate=True
166 ) # always valid standard base64
167 try:
168 return robust_b64decode(
169 content["text"]
170 ) # possibly in base64 (various forms...)
171 except (ValueError, UnicodeError):
172 return None
173
174 @classmethod
175 def _get_request_bytes_and_mime(cls, request: dict) -> tuple[bytes | None, str]:
176 # <!> the sender may base64-encode the bytes himself beforehand
177 if "postData" in request:
178 return cls._get_bytes_possibly_from_base64(request["postData"]), request[
179 "postData"
180 ]["mimeType"]
181 if "_content" in request:
182 return cls._get_bytes_possibly_from_base64(request["_content"]), request[
183 "_content"
184 ].get("mimeType", "")
185 return None, ""
186
187 @classmethod
188 def _get_response_bytes_and_mime(cls, response: dict) -> tuple[bytes | None, str]:
189 # <!> the sender may base64-encode the bytes himself beforehand
190 if "content" in response:
191 return cls._get_bytes_possibly_from_base64(response["content"]), response[
192 "content"
193 ]["mimeType"]
194 return None, ""
195
[docs]
196 def enrich_entry(self, har_entry: dict[str, Any]) -> None:
197 # Process the request data and attach the decryption data if found
198 request = har_entry["request"]
199 req_bytes, req_mimetype = self._get_request_bytes_and_mime(request)
200 if req_bytes:
201 enrichment_data = self._find_decrypted_data(req_bytes, "out")
202 if enrichment_data:
203 Payload(enrichment_data.pop("decryptedBytes")).update_har_request(
204 request, req_mimetype
205 )
206 request["_decryption"] = enrichment_data
207
208 # Process the response data and attach the decryption data if found
209 response = har_entry["response"]
210 resp_bytes, resp_mimetype = self._get_response_bytes_and_mime(response)
211 if resp_bytes:
212 enrichment_data = self._find_decrypted_data(resp_bytes, "in")
213 if enrichment_data:
214 Payload(enrichment_data.pop("decryptedBytes")).update_har_response(
215 response, resp_mimetype
216 )
217 response["_decryption"] = enrichment_data