Source code for pcapng_utils.har.pirogue_enrichment.decryption

  1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
  2# SPDX-License-Identifier: MIT
  3
  4import logging
  5from pathlib import Path
  6from typing import ClassVar, Any
  7from base64 import b64decode, b64encode
  8
  9from pcapng_utils.payload import Payload
 10from .base import HarEnrichment
 11from .types import FlowDirection
 12from .utils import robust_b64decode
 13
 14
 15logger = logging.getLogger("pcapng_utils.pirogue_enrichment.decryption")
 16
 17
[docs] 18class ContentDecryption(HarEnrichment): 19 20 ID: ClassVar = "pirogue_decryption" 21 22 MIN_LEN_ENCRYPTED_BYTES: int = ( 23 8 # at least 16 bytes for AES encrypted data for instance 24 ) 25 MIN_LEN_DECRYPTED_BYTES: int = 2 # e.g. '[]' or '{}' 26 27 MAX_SIZE_DIFF_FRACTION: float = ( 28 0.5 # do NOT keep best match if abs. size difference is > 50% of original size 29 ) 30 31 def __init__(self, har_data: dict, input_data_file: Path) -> None: 32 super().__init__(har_data, input_data_file) 33 self.cryptography_operations: list[dict] = self.input_data # type: ignore 34 35 @staticmethod 36 def _is_ignored_algorithm(algo: str) -> bool: 37 # Message digests, MAC and signatures (could lead to false positives) 38 # cf. https://developer.android.com/reference/java/security/MessageDigest 39 # cf. https://developer.android.com/reference/java/security/Signature 40 # cf. https://developer.android.com/reference/kotlin/javax/crypto/Mac 41 algo = algo.upper() 42 return ( 43 algo.startswith("SHA") 44 or algo.startswith("MD5") 45 or algo.startswith("DSA") 46 or algo.startswith("ECDSA") 47 or algo.startswith("ED25519") 48 or algo.startswith("EDDSA") 49 or algo.startswith("NONEWITH") 50 or algo.startswith("HMAC") 51 or algo.startswith("PBEWITHHMAC") 52 or algo.startswith("AESCMAC") 53 ) 54 55 @staticmethod 56 def _is_asymmetrical_encryption(algo: str) -> bool: 57 # cf. https://developer.android.com/reference/javax/crypto/Cipher 58 algo = algo.upper() 59 return algo.startswith("RSA") 60 61 def _find_decrypted_data( 62 self, encrypted_payload: bytes, encrypted_data_parameter_name: FlowDirection 63 ) -> dict: 64 """Find the decrypted data matching the given base64 encoded payload""" 65 # Fail fast 66 if encrypted_data_parameter_name not in {"in", "out"}: 67 raise ValueError(f"Invalid {encrypted_data_parameter_name=}") 68 69 # Ignore payload with less than 8 bytes to avoid false positives and collisions 70 len_encrypted_payload = len(encrypted_payload) 71 if len_encrypted_payload < self.MIN_LEN_ENCRYPTED_BYTES: 72 return {} 73 74 best_match: dict = {} 75 best_abs_size_diff: float | None = None # in number of bytes (absolute) 76 hex_encrypted_payload = encrypted_payload.hex() 77 78 # If it's a request, the encrypted data is the output ('out') of the cryptographic primitive, 79 # but in input ('in') for a response 80 decrypted_data_parameter_name = ( 81 "out" if encrypted_data_parameter_name == "in" else "in" 82 ) 83 84 for operation in self.cryptography_operations: 85 # Read the cryptographic operation data and try to match 86 # - algorithm is ignored (signature, digest, MAC) 87 # - unless data for operation is missing/empty/too tiny 88 op_algo = operation.get("alg", "") 89 if self._is_ignored_algorithm(op_algo): 90 continue 91 92 # <!> both encrypted and decrypted data encoded in hexadecimal from the cryptographic primitive data 93 op_data = operation.get("data", {}) 94 op_hex_encrypted_data = op_data.get(encrypted_data_parameter_name, "") 95 op_hex_decrypted_data = op_data.get(decrypted_data_parameter_name, "") 96 97 len_op_encrypted_data = len(op_hex_encrypted_data) // 2 98 len_op_decrypted_data = len(op_hex_decrypted_data) // 2 99 if ( 100 len_op_encrypted_data < self.MIN_LEN_ENCRYPTED_BYTES 101 or len_op_decrypted_data < self.MIN_LEN_DECRYPTED_BYTES 102 ): 103 continue 104 105 # Check if the encrypted data is in the payload or vice versa 106 if ( 107 op_hex_encrypted_data in hex_encrypted_payload 108 or hex_encrypted_payload in op_hex_encrypted_data 109 ): 110 # Compute the size difference between the operation data and the actual payload 111 # (operation size is mean of encrypted and decrypted sizes by default, only encrypted size for asymmetric cipher) 112 # Minimum size difference is the best match 113 len_op = ( 114 len_op_encrypted_data 115 if self._is_asymmetrical_encryption(op_algo) 116 else (len_op_encrypted_data + len_op_decrypted_data) / 2 117 ) 118 abs_diff = abs(len_encrypted_payload - len_op) 119 if best_abs_size_diff is None or abs_diff < best_abs_size_diff: 120 best_abs_size_diff = abs_diff 121 best_match = { 122 "operation": operation, 123 "hex_decrypted_data": op_hex_decrypted_data, 124 "size_diff_encrypted": len_op_encrypted_data 125 - len_encrypted_payload, 126 "size_diff_decrypted": len_op_decrypted_data 127 - len_encrypted_payload, 128 } 129 130 # The best match was found, prepare the enrichment data 131 if not best_match: 132 return {} 133 134 assert best_abs_size_diff is not None 135 best_abs_size_diff_frac = best_abs_size_diff / len_encrypted_payload 136 logger.debug( 137 f"Decrypted content found: abs. size difference = {best_abs_size_diff_frac:.1%} of encrypted size" 138 ) 139 140 if best_abs_size_diff_frac > self.MAX_SIZE_DIFF_FRACTION: 141 logger.debug("Ignoring decrypted content since abs. size diff is too big") 142 return {} 143 144 operation = best_match["operation"] 145 decrypted_bytes = bytes.fromhex(best_match["hex_decrypted_data"]) 146 147 return { 148 "pid": operation.get("pid", ""), 149 "process": operation.get("process", ""), 150 "timestamp": operation.get("timestamp", 0.0) / 1000.0, # Convert to seconds 151 "primitiveParameters": { 152 "algorithm": operation["data"].get("alg", ""), 153 "key": operation["data"].get("key", ""), 154 "iv": operation["data"].get("iv", ""), 155 }, 156 "originalBase64Content": b64encode(encrypted_payload).decode("ascii"), 157 "sizeDiffEncrypted": int(best_match["size_diff_encrypted"]), 158 "sizeDiffDecrypted": int(best_match["size_diff_decrypted"]), 159 # temp key for data exchange, not stored in HAR 160 "decryptedBytes": decrypted_bytes, 161 } 162 163 @staticmethod 164 def _get_bytes_possibly_from_base64(content: dict[str, str]) -> bytes | None: 165 if content.get("encoding") == "base64": 166 return b64decode( 167 content["text"], validate=True 168 ) # always valid standard base64 169 try: 170 return robust_b64decode( 171 content["text"] 172 ) # possibly in base64 (various forms...) 173 except (ValueError, UnicodeError): 174 return None 175 176 @classmethod 177 def _get_request_bytes_and_mime(cls, request: dict) -> tuple[bytes | None, str]: 178 # <!> the sender may base64-encode the bytes himself beforehand 179 if "postData" in request: 180 return cls._get_bytes_possibly_from_base64(request["postData"]), request[ 181 "postData" 182 ]["mimeType"] 183 if "_content" in request: 184 return cls._get_bytes_possibly_from_base64(request["_content"]), request[ 185 "_content" 186 ].get("mimeType", "") 187 return None, "" 188 189 @classmethod 190 def _get_response_bytes_and_mime(cls, response: dict) -> tuple[bytes | None, str]: 191 # <!> the sender may base64-encode the bytes himself beforehand 192 if "content" in response: 193 return cls._get_bytes_possibly_from_base64(response["content"]), response[ 194 "content" 195 ]["mimeType"] 196 return None, "" 197
[docs] 198 def enrich_entry(self, har_entry: dict[str, Any]) -> None: 199 # Process the request data and attach the decryption data if found 200 request = har_entry["request"] 201 req_bytes, req_mimetype = self._get_request_bytes_and_mime(request) 202 if req_bytes: 203 enrichment_data = self._find_decrypted_data(req_bytes, "out") 204 if enrichment_data: 205 Payload(enrichment_data.pop("decryptedBytes")).update_har_request( 206 request, req_mimetype 207 ) 208 request["_decryption"] = enrichment_data 209 210 # Process the response data and attach the decryption data if found 211 response = har_entry["response"] 212 resp_bytes, resp_mimetype = self._get_response_bytes_and_mime(response) 213 if resp_bytes: 214 enrichment_data = self._find_decrypted_data(resp_bytes, "in") 215 if enrichment_data: 216 Payload(enrichment_data.pop("decryptedBytes")).update_har_response( 217 response, resp_mimetype 218 ) 219 response["_decryption"] = enrichment_data