Source code for pcapng_utils.tshark.utils

 1import binascii
 2import json
 3from datetime import datetime, timezone
 4from hashlib import sha1
 5from collections.abc import Sequence, Mapping
 6from typing import Optional, Any
 7
 8from .types import DictLayers, TsharkRaw
 9
10
[docs] 11def get_layers_mapping(traffic: Sequence[DictLayers]) -> Mapping[int, DictLayers]: 12 """ 13 Get mapping of layers by frame number (once for all). 14 """ 15 mapping: dict[int, DictLayers] = {} 16 for layers in traffic: 17 frame_number = int(layers.get("frame", {}).get("frame.number", -1)) 18 if frame_number >= 0: 19 assert frame_number not in mapping, frame_number 20 mapping[frame_number] = layers 21 return mapping
22 23
[docs] 24def get_tshark_bytes_from_raw(r: Optional[TsharkRaw]) -> bytes: 25 """ 26 Format of '*_raw' fields produced with '-x' flag: [hexa: str, *sizes: int] 27 """ 28 if r is None: 29 return b"" 30 assert isinstance(r, list) and len(r) == 5, r 31 hexa = r[0] 32 assert isinstance(hexa, str) and hexa.isascii(), hexa 33 return binascii.unhexlify(hexa)
34 35
[docs] 36def har_entry_with_common_fields(har_entry: dict[str, Any]) -> dict[str, Any]: 37 """ 38 Return provided HAR entry together with common fields. 39 40 In particular, we add the non-standard `_sha1Id` field that serves both as entry identifier + 41 easy changes-tracker across different releases of this software. 42 """ 43 to_hash = json.dumps( 44 har_entry, allow_nan=False, ensure_ascii=True, indent=0, sort_keys=True 45 ).encode("ascii") 46 sha1id = sha1(to_hash).hexdigest() 47 timestamp_iso = datetime.fromtimestamp( 48 har_entry["_timestamp"], timezone.utc 49 ).isoformat() 50 timing_tot = sum(dur for dur in har_entry["timings"].values() if dur != -1) 51 return { 52 "_sha1Id": sha1id, 53 "startedDateTime": timestamp_iso, 54 **har_entry, 55 "time": round(timing_tot, 2), 56 "cache": {}, # not handled by this software 57 }