Source code for pcapng_utils.payload

 1import base64
 2from dataclasses import dataclass
 3from functools import cached_property
 4from hashlib import sha1
 5from typing import TypedDict, NotRequired, Literal, Self, Any
 6
 7ALLOWED_NON_PRINTABLE_CHARS = str.maketrans("", "", "\t\n\r")
 8
 9
[docs] 10class HARPayloadDict(TypedDict): 11 size: int 12 text: str 13 encoding: NotRequired[Literal["base64"]]
14 15
[docs] 16@dataclass(frozen=True, repr=False) 17class Payload: 18 """Representation of either bytes, possibly representing UTF8 plain-text (useful for HAR export).""" 19 20 bytes_: bytes = b"" 21
[docs] 22 @cached_property 23 def size(self) -> int: 24 return len(self.bytes_) # <!> len('€') == 1 != len('€'.encode()) == 3
25
[docs] 26 @cached_property 27 def sha1(self) -> str: 28 return sha1(self.bytes_).hexdigest()
29 30 def __bool__(self) -> bool: 31 return bool(self.bytes_) 32 33 def __repr__(self) -> str: 34 if not self: 35 return "Payload(size=0)" 36 return f"Payload(size={self.size}, sha1={self.sha1})" 37
[docs] 38 @classmethod 39 def concat(cls, *payloads: Self) -> Self: 40 """Concatenate all payloads in order.""" 41 concat_bytes = b"".join(p.bytes_ for p in payloads) # can't use `sum` here 42 return cls(concat_bytes)
43
[docs] 44 def to_har_dict(self) -> HARPayloadDict: 45 """Serialize content, with HAR formalism (cf. remarks in `update_har_request`).""" 46 try: 47 plain_txt = self.bytes_.decode() 48 assert plain_txt.translate(ALLOWED_NON_PRINTABLE_CHARS).isprintable() 49 return { 50 "size": self.size, 51 "text": plain_txt, 52 } 53 except Exception: # noqa 54 pass 55 return { 56 "size": self.size, 57 "text": base64.b64encode(self.bytes_).decode("ascii"), 58 "encoding": "base64", 59 }
60
[docs] 61 def update_har_request(self, request_entry: dict[str, Any], mimetype: str) -> None: 62 """Complete entry.request in-place 63 64 In specs, `size` & `encoding` are not supported for `postData`, 65 so we shall use the `httptoolkit` standard to store non-printable request data, 66 in the dedicated `_content` field + `_requestBodyStatus: 'discarded:not-representable'` 67 68 We remove any original request data keys prior to filling with new ones 69 """ 70 # clean-up request entry first 71 request_entry.pop("postData", None) 72 request_entry.pop("_content", None) 73 request_entry.pop("_requestBodyStatus", None) 74 # fill with new data 75 har_payload = self.to_har_dict() 76 if "encoding" in har_payload: 77 request_entry["_requestBodyStatus"] = "discarded:not-representable" 78 request_entry["_content"] = { 79 "mimeType": mimetype, # addition to httptoolkit specs, for consistence 80 **har_payload, 81 } 82 else: 83 request_entry["postData"] = { 84 "mimeType": mimetype, 85 "params": [], # mandatory in specs 86 **har_payload, 87 # size is not in specs... 88 "_size": har_payload["size"], 89 } 90 del request_entry["postData"]["size"]
91
[docs] 92 def update_har_response( 93 self, response_entry: dict[str, Any], mimetype: str 94 ) -> None: 95 """Complete entry.response in-place""" 96 response_entry["content"] = { 97 "mimeType": mimetype, 98 **self.to_har_dict(), 99 }