1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
2# SPDX-License-Identifier: MIT
3
4import logging
5from pathlib import Path
6from typing import ClassVar, Any
7from base64 import b64decode, b64encode
8
9from pcapng_utils.payload import Payload
10from .base import HarEnrichment
11from .types import FlowDirection
12from .utils import robust_b64decode
13
14
15logger = logging.getLogger("pcapng_utils.pirogue_enrichment.decryption")
16
17
[docs]
18class ContentDecryption(HarEnrichment):
19
20 ID: ClassVar = "pirogue_decryption"
21
22 MIN_LEN_ENCRYPTED_BYTES: int = (
23 8 # at least 16 bytes for AES encrypted data for instance
24 )
25 MIN_LEN_DECRYPTED_BYTES: int = 2 # e.g. '[]' or '{}'
26
27 MAX_SIZE_DIFF_FRACTION: float = (
28 0.5 # do NOT keep best match if abs. size difference is > 50% of original size
29 )
30
31 def __init__(self, har_data: dict, input_data_file: Path) -> None:
32 super().__init__(har_data, input_data_file)
33 self.cryptography_operations: list[dict] = self.input_data # type: ignore
34
35 @staticmethod
36 def _is_ignored_algorithm(algo: str) -> bool:
37 # Message digests, MAC and signatures (could lead to false positives)
38 # cf. https://developer.android.com/reference/java/security/MessageDigest
39 # cf. https://developer.android.com/reference/java/security/Signature
40 # cf. https://developer.android.com/reference/kotlin/javax/crypto/Mac
41 algo = algo.upper()
42 return (
43 algo.startswith("SHA")
44 or algo.startswith("MD5")
45 or algo.startswith("DSA")
46 or algo.startswith("ECDSA")
47 or algo.startswith("ED25519")
48 or algo.startswith("EDDSA")
49 or algo.startswith("NONEWITH")
50 or algo.startswith("HMAC")
51 or algo.startswith("PBEWITHHMAC")
52 or algo.startswith("AESCMAC")
53 )
54
55 @staticmethod
56 def _is_asymmetrical_encryption(algo: str) -> bool:
57 # cf. https://developer.android.com/reference/javax/crypto/Cipher
58 algo = algo.upper()
59 return algo.startswith("RSA")
60
61 def _find_decrypted_data(
62 self, encrypted_payload: bytes, encrypted_data_parameter_name: FlowDirection
63 ) -> dict:
64 """Find the decrypted data matching the given base64 encoded payload"""
65 # Fail fast
66 if encrypted_data_parameter_name not in {"in", "out"}:
67 raise ValueError(f"Invalid {encrypted_data_parameter_name=}")
68
69 # Ignore payload with less than 8 bytes to avoid false positives and collisions
70 len_encrypted_payload = len(encrypted_payload)
71 if len_encrypted_payload < self.MIN_LEN_ENCRYPTED_BYTES:
72 return {}
73
74 best_match: dict = {}
75 best_abs_size_diff: float | None = None # in number of bytes (absolute)
76 hex_encrypted_payload = encrypted_payload.hex()
77
78 # If it's a request, the encrypted data is the output ('out') of the cryptographic primitive,
79 # but in input ('in') for a response
80 decrypted_data_parameter_name = (
81 "out" if encrypted_data_parameter_name == "in" else "in"
82 )
83
84 for operation in self.cryptography_operations:
85 # Read the cryptographic operation data and try to match
86 # - algorithm is ignored (signature, digest, MAC)
87 # - unless data for operation is missing/empty/too tiny
88 op_algo = operation.get("alg", "")
89 if self._is_ignored_algorithm(op_algo):
90 continue
91
92 # <!> both encrypted and decrypted data encoded in hexadecimal from the cryptographic primitive data
93 op_data = operation.get("data", {})
94 op_hex_encrypted_data = op_data.get(encrypted_data_parameter_name, "")
95 op_hex_decrypted_data = op_data.get(decrypted_data_parameter_name, "")
96
97 len_op_encrypted_data = len(op_hex_encrypted_data) // 2
98 len_op_decrypted_data = len(op_hex_decrypted_data) // 2
99 if (
100 len_op_encrypted_data < self.MIN_LEN_ENCRYPTED_BYTES
101 or len_op_decrypted_data < self.MIN_LEN_DECRYPTED_BYTES
102 ):
103 continue
104
105 # Check if the encrypted data is in the payload or vice versa
106 if (
107 op_hex_encrypted_data in hex_encrypted_payload
108 or hex_encrypted_payload in op_hex_encrypted_data
109 ):
110 # Compute the size difference between the operation data and the actual payload
111 # (operation size is mean of encrypted and decrypted sizes by default, only encrypted size for asymmetric cipher)
112 # Minimum size difference is the best match
113 len_op = (
114 len_op_encrypted_data
115 if self._is_asymmetrical_encryption(op_algo)
116 else (len_op_encrypted_data + len_op_decrypted_data) / 2
117 )
118 abs_diff = abs(len_encrypted_payload - len_op)
119 if best_abs_size_diff is None or abs_diff < best_abs_size_diff:
120 best_abs_size_diff = abs_diff
121 best_match = {
122 "operation": operation,
123 "hex_decrypted_data": op_hex_decrypted_data,
124 "size_diff_encrypted": len_op_encrypted_data
125 - len_encrypted_payload,
126 "size_diff_decrypted": len_op_decrypted_data
127 - len_encrypted_payload,
128 }
129
130 # The best match was found, prepare the enrichment data
131 if not best_match:
132 return {}
133
134 assert best_abs_size_diff is not None
135 best_abs_size_diff_frac = best_abs_size_diff / len_encrypted_payload
136 logger.debug(
137 f"Decrypted content found: abs. size difference = {best_abs_size_diff_frac:.1%} of encrypted size"
138 )
139
140 if best_abs_size_diff_frac > self.MAX_SIZE_DIFF_FRACTION:
141 logger.debug("Ignoring decrypted content since abs. size diff is too big")
142 return {}
143
144 operation = best_match["operation"]
145 decrypted_bytes = bytes.fromhex(best_match["hex_decrypted_data"])
146
147 return {
148 "pid": operation.get("pid", ""),
149 "process": operation.get("process", ""),
150 "timestamp": operation.get("timestamp", 0.0) / 1000.0, # Convert to seconds
151 "primitiveParameters": {
152 "algorithm": operation["data"].get("alg", ""),
153 "key": operation["data"].get("key", ""),
154 "iv": operation["data"].get("iv", ""),
155 },
156 "originalBase64Content": b64encode(encrypted_payload).decode("ascii"),
157 "sizeDiffEncrypted": int(best_match["size_diff_encrypted"]),
158 "sizeDiffDecrypted": int(best_match["size_diff_decrypted"]),
159 # temp key for data exchange, not stored in HAR
160 "decryptedBytes": decrypted_bytes,
161 }
162
163 @staticmethod
164 def _get_bytes_possibly_from_base64(content: dict[str, str]) -> bytes | None:
165 if content.get("encoding") == "base64":
166 return b64decode(
167 content["text"], validate=True
168 ) # always valid standard base64
169 try:
170 return robust_b64decode(
171 content["text"]
172 ) # possibly in base64 (various forms...)
173 except (ValueError, UnicodeError):
174 return None
175
176 @classmethod
177 def _get_request_bytes_and_mime(cls, request: dict) -> tuple[bytes | None, str]:
178 # <!> the sender may base64-encode the bytes himself beforehand
179 if "postData" in request:
180 return cls._get_bytes_possibly_from_base64(request["postData"]), request[
181 "postData"
182 ]["mimeType"]
183 if "_content" in request:
184 return cls._get_bytes_possibly_from_base64(request["_content"]), request[
185 "_content"
186 ].get("mimeType", "")
187 return None, ""
188
189 @classmethod
190 def _get_response_bytes_and_mime(cls, response: dict) -> tuple[bytes | None, str]:
191 # <!> the sender may base64-encode the bytes himself beforehand
192 if "content" in response:
193 return cls._get_bytes_possibly_from_base64(response["content"]), response[
194 "content"
195 ]["mimeType"]
196 return None, ""
197
[docs]
198 def enrich_entry(self, har_entry: dict[str, Any]) -> None:
199 # Process the request data and attach the decryption data if found
200 request = har_entry["request"]
201 req_bytes, req_mimetype = self._get_request_bytes_and_mime(request)
202 if req_bytes:
203 enrichment_data = self._find_decrypted_data(req_bytes, "out")
204 if enrichment_data:
205 Payload(enrichment_data.pop("decryptedBytes")).update_har_request(
206 request, req_mimetype
207 )
208 request["_decryption"] = enrichment_data
209
210 # Process the response data and attach the decryption data if found
211 response = har_entry["response"]
212 resp_bytes, resp_mimetype = self._get_response_bytes_and_mime(response)
213 if resp_bytes:
214 enrichment_data = self._find_decrypted_data(resp_bytes, "in")
215 if enrichment_data:
216 Payload(enrichment_data.pop("decryptedBytes")).update_har_response(
217 response, resp_mimetype
218 )
219 response["_decryption"] = enrichment_data