Source code for pcapng_utils.har.pirogue_enrichment.decryption

  1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
  2# SPDX-License-Identifier: MIT
  3
  4import logging
  5from pathlib import Path
  6from typing import ClassVar, Any
  7from base64 import b64decode, b64encode
  8
  9from pcapng_utils.payload import Payload
 10from .base import HarEnrichment
 11from .types import FlowDirection
 12from .utils import robust_b64decode
 13
 14logger = logging.getLogger("enrichment")
 15
 16
[docs] 17class ContentDecryption(HarEnrichment): 18 ID: ClassVar = "decryption" 19 20 MIN_LEN_ENCRYPTED_BYTES: int = ( 21 8 # at least 16 bytes for AES encrypted data for instance 22 ) 23 MIN_LEN_DECRYPTED_BYTES: int = 2 # e.g. '[]' or '{}' 24 25 MAX_SIZE_DIFF_FRACTION: float = ( 26 0.5 # do NOT keep best match if abs. size difference is > 50% of original size 27 ) 28 29 def __init__(self, har_data: dict, input_data_file: Path) -> None: 30 super().__init__(har_data, input_data_file) 31 self.cryptography_operations: list[dict] = self.input_data # type: ignore 32 33 @staticmethod 34 def _is_ignored_algorithm(algo: str) -> bool: 35 # Message digests, MAC and signatures (could lead to false positives) 36 # cf. https://developer.android.com/reference/java/security/MessageDigest 37 # cf. https://developer.android.com/reference/java/security/Signature 38 # cf. https://developer.android.com/reference/kotlin/javax/crypto/Mac 39 algo = algo.upper() 40 return ( 41 algo.startswith("SHA") 42 or algo.startswith("MD5") 43 or algo.startswith("DSA") 44 or algo.startswith("ECDSA") 45 or algo.startswith("ED25519") 46 or algo.startswith("EDDSA") 47 or algo.startswith("NONEWITH") 48 or algo.startswith("HMAC") 49 or algo.startswith("PBEWITHHMAC") 50 or algo.startswith("AESCMAC") 51 ) 52 53 @staticmethod 54 def _is_asymmetrical_encryption(algo: str) -> bool: 55 # cf. https://developer.android.com/reference/javax/crypto/Cipher 56 algo = algo.upper() 57 return algo.startswith("RSA") 58 59 def _find_decrypted_data( 60 self, encrypted_payload: bytes, encrypted_data_parameter_name: FlowDirection 61 ) -> dict: 62 """Find the decrypted data matching the given base64 encoded payload""" 63 # Fail fast 64 if encrypted_data_parameter_name not in {"in", "out"}: 65 raise ValueError(f"Invalid {encrypted_data_parameter_name=}") 66 67 # Ignore payload with less than 8 bytes to avoid false positives and collisions 68 len_encrypted_payload = len(encrypted_payload) 69 if len_encrypted_payload < self.MIN_LEN_ENCRYPTED_BYTES: 70 return {} 71 72 best_match: dict = {} 73 best_abs_size_diff: float | None = None # in number of bytes (absolute) 74 hex_encrypted_payload = encrypted_payload.hex() 75 76 # If it's a request, the encrypted data is the output ('out') of the cryptographic primitive, 77 # but in input ('in') for a response 78 decrypted_data_parameter_name = ( 79 "out" if encrypted_data_parameter_name == "in" else "in" 80 ) 81 82 for operation in self.cryptography_operations: 83 # Read the cryptographic operation data and try to match 84 # - algorithm is ignored (signature, digest, MAC) 85 # - unless data for operation is missing/empty/too tiny 86 op_algo = operation.get("alg", "") 87 if self._is_ignored_algorithm(op_algo): 88 continue 89 90 # <!> both encrypted and decrypted data encoded in hexadecimal from the cryptographic primitive data 91 op_data = operation.get("data", {}) 92 op_hex_encrypted_data = op_data.get(encrypted_data_parameter_name, "") 93 op_hex_decrypted_data = op_data.get(decrypted_data_parameter_name, "") 94 95 len_op_encrypted_data = len(op_hex_encrypted_data) // 2 96 len_op_decrypted_data = len(op_hex_decrypted_data) // 2 97 if ( 98 len_op_encrypted_data < self.MIN_LEN_ENCRYPTED_BYTES 99 or len_op_decrypted_data < self.MIN_LEN_DECRYPTED_BYTES 100 ): 101 continue 102 103 # Check if the encrypted data is in the payload or vice versa 104 if ( 105 op_hex_encrypted_data in hex_encrypted_payload 106 or hex_encrypted_payload in op_hex_encrypted_data 107 ): 108 # Compute the size difference between the operation data and the actual payload 109 # (operation size is mean of encrypted and decrypted sizes by default, only encrypted size for asymmetric cipher) 110 # Minimum size difference is the best match 111 len_op = ( 112 len_op_encrypted_data 113 if self._is_asymmetrical_encryption(op_algo) 114 else (len_op_encrypted_data + len_op_decrypted_data) / 2 115 ) 116 abs_diff = abs(len_encrypted_payload - len_op) 117 if best_abs_size_diff is None or abs_diff < best_abs_size_diff: 118 best_abs_size_diff = abs_diff 119 best_match = { 120 "operation": operation, 121 "hex_decrypted_data": op_hex_decrypted_data, 122 "size_diff_encrypted": len_op_encrypted_data 123 - len_encrypted_payload, 124 "size_diff_decrypted": len_op_decrypted_data 125 - len_encrypted_payload, 126 } 127 128 # The best match was found, prepare the enrichment data 129 if not best_match: 130 return {} 131 132 assert best_abs_size_diff is not None 133 best_abs_size_diff_frac = best_abs_size_diff / len_encrypted_payload 134 logger.debug( 135 f"Decrypted content found: abs. size difference = {best_abs_size_diff_frac:.1%} of encrypted size" 136 ) 137 138 if best_abs_size_diff_frac > self.MAX_SIZE_DIFF_FRACTION: 139 logger.debug("Ignoring decrypted content since abs. size diff is too big") 140 return {} 141 142 operation = best_match["operation"] 143 decrypted_bytes = bytes.fromhex(best_match["hex_decrypted_data"]) 144 145 return { 146 "pid": operation.get("pid", ""), 147 "process": operation.get("process", ""), 148 "timestamp": operation.get("timestamp", 0.0) / 1000.0, # Convert to seconds 149 "primitiveParameters": { 150 "algorithm": operation["data"].get("alg", ""), 151 "key": operation["data"].get("key", ""), 152 "iv": operation["data"].get("iv", ""), 153 }, 154 "originalBase64Content": b64encode(encrypted_payload).decode("ascii"), 155 "sizeDiffEncrypted": int(best_match["size_diff_encrypted"]), 156 "sizeDiffDecrypted": int(best_match["size_diff_decrypted"]), 157 # temp key for data exchange, not stored in HAR 158 "decryptedBytes": decrypted_bytes, 159 } 160 161 @staticmethod 162 def _get_bytes_possibly_from_base64(content: dict[str, str]) -> bytes | None: 163 if content.get("encoding") == "base64": 164 return b64decode( 165 content["text"], validate=True 166 ) # always valid standard base64 167 try: 168 return robust_b64decode( 169 content["text"] 170 ) # possibly in base64 (various forms...) 171 except (ValueError, UnicodeError): 172 return None 173 174 @classmethod 175 def _get_request_bytes_and_mime(cls, request: dict) -> tuple[bytes | None, str]: 176 # <!> the sender may base64-encode the bytes himself beforehand 177 if "postData" in request: 178 return cls._get_bytes_possibly_from_base64(request["postData"]), request[ 179 "postData" 180 ]["mimeType"] 181 if "_content" in request: 182 return cls._get_bytes_possibly_from_base64(request["_content"]), request[ 183 "_content" 184 ].get("mimeType", "") 185 return None, "" 186 187 @classmethod 188 def _get_response_bytes_and_mime(cls, response: dict) -> tuple[bytes | None, str]: 189 # <!> the sender may base64-encode the bytes himself beforehand 190 if "content" in response: 191 return cls._get_bytes_possibly_from_base64(response["content"]), response[ 192 "content" 193 ]["mimeType"] 194 return None, "" 195
[docs] 196 def enrich_entry(self, har_entry: dict[str, Any]) -> None: 197 # Process the request data and attach the decryption data if found 198 request = har_entry["request"] 199 req_bytes, req_mimetype = self._get_request_bytes_and_mime(request) 200 if req_bytes: 201 enrichment_data = self._find_decrypted_data(req_bytes, "out") 202 if enrichment_data: 203 Payload(enrichment_data.pop("decryptedBytes")).update_har_request( 204 request, req_mimetype 205 ) 206 request["_decryption"] = enrichment_data 207 208 # Process the response data and attach the decryption data if found 209 response = har_entry["response"] 210 resp_bytes, resp_mimetype = self._get_response_bytes_and_mime(response) 211 if resp_bytes: 212 enrichment_data = self._find_decrypted_data(resp_bytes, "in") 213 if enrichment_data: 214 Payload(enrichment_data.pop("decryptedBytes")).update_har_response( 215 response, resp_mimetype 216 ) 217 response["_decryption"] = enrichment_data