1import binascii
2import json
3from datetime import datetime, timezone
4from hashlib import sha1
5from collections.abc import Sequence, Mapping
6from typing import Optional, Any
7
8from .types import DictLayers, TsharkRaw
9
10
[docs]
11def get_layers_mapping(traffic: Sequence[DictLayers]) -> Mapping[int, DictLayers]:
12 """
13 Get mapping of layers by frame number (once for all).
14 """
15 mapping: dict[int, DictLayers] = {}
16 for layers in traffic:
17 frame_number = int(layers.get("frame", {}).get("frame.number", -1))
18 if frame_number >= 0:
19 assert frame_number not in mapping, frame_number
20 mapping[frame_number] = layers
21 return mapping
22
23
[docs]
24def get_tshark_bytes_from_raw(r: Optional[TsharkRaw]) -> bytes:
25 """
26 Format of '*_raw' fields produced with '-x' flag: [hexa: str, *sizes: int]
27 """
28 if r is None:
29 return b""
30 assert isinstance(r, list) and len(r) == 5, r
31 hexa = r[0]
32 assert isinstance(hexa, str) and hexa.isascii(), hexa
33 return binascii.unhexlify(hexa)
34
35
[docs]
36def har_entry_with_common_fields(har_entry: dict[str, Any]) -> dict[str, Any]:
37 """
38 Return provided HAR entry together with common fields.
39
40 In particular, we add the non-standard `_sha1Id` field that serves both as entry identifier +
41 easy changes-tracker across different releases of this software.
42 """
43 to_hash = json.dumps(
44 har_entry, allow_nan=False, ensure_ascii=True, indent=0, sort_keys=True
45 ).encode("ascii")
46 sha1id = sha1(to_hash).hexdigest()
47 timestamp_iso = datetime.fromtimestamp(
48 har_entry["_timestamp"], timezone.utc
49 ).isoformat()
50 timing_tot = sum(dur for dur in har_entry["timings"].values() if dur != -1)
51 return {
52 "_sha1Id": sha1id,
53 "startedDateTime": timestamp_iso,
54 **har_entry,
55 "time": round(timing_tot, 2),
56 "cache": {}, # not handled by this software
57 }