1#!/usr/bin/env python3
2import json
3import logging
4import platform
5from dataclasses import dataclass, field, KW_ONLY
6from pathlib import Path
7from typing import Any, Annotated, Literal, get_args
8
9import tyro
10
11from pcapng_utils.tshark import Tshark, NetworkTrafficDump
12from pcapng_utils.har.pirogue_enrichment import HarEnrichment, Stacktrace, ContentDecryption
13
14DEFAULT_TSHARK_PATH = {
15 "Linux": "/usr/bin/tshark",
16 "Darwin": "/Applications/Wireshark.app/Contents/MacOS/tshark",
17}.get(platform.system())
18
19TrueValueType = Literal[1, "1", True, "true", "True", "TRUE"]
20
21
[docs]
22@dataclass(frozen=True)
23class PcapngToHar:
24 """CLI script for converting .pcapng file to .har file using tshark"""
25
26 input: Annotated[Path, tyro.conf.arg(aliases=("-i",))]
27 """Path to input .pcapng"""
28
29 output: Annotated[Path | None, tyro.conf.arg(aliases=("-o",), metavar="PATH")] = None
30 """Path to output .har, INPUT.har if unset"""
31
32 _: KW_ONLY
33
34 tshark_out: Annotated[TrueValueType | str | None, tyro.conf.arg(aliases=("-ot",), metavar="PATH|1")] = None
35 """Path to raw tshark output as .json optional, if `ot=1` -> OUTPUT.json"""
36
37 # Arguments for enriching the HAR data
38
39 time_shift: Annotated[float, tyro.conf.arg(metavar="SECONDS")] = 0.0
40 """
41 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.
42 Positive means network traffic timestamps (Pirogue date) were earlier than socket operations timestamps (phone date).
43 """
44
45 socket_operations_file: Annotated[Path | None, tyro.conf.arg(aliases=("-sf",), metavar="PATH")] = None
46 """Path to the socket operations data file generated by Pirogue, INPUT_DIR/socket_trace.json if unset"""
47
48 cryptography_operations_file: Annotated[Path | None, tyro.conf.arg(aliases=("-cf",), metavar="PATH")] = None
49 """Path to the cryptography data file generated by Pirogue, INPUT_DIR/aes_info.json if unset"""
50
51 tshark: str = (
52 field(default=DEFAULT_TSHARK_PATH) if DEFAULT_TSHARK_PATH and Path(DEFAULT_TSHARK_PATH).exists() else field()
53 )
54 """Path/command for tshark executable"""
55
56 force: Annotated[bool, tyro.conf.arg(aliases=("-f",))] = False
57 """Whether to overwrite output if it exists"""
58
59 verbose: Annotated[bool, tyro.conf.arg(aliases=("-v",))] = False
60 """Activate verbose logging"""
61
[docs]
62 @classmethod
63 def cli(cls) -> None:
64 cfg = tyro.cli(cls, config=(tyro.conf.FlagCreatePairsOff,))
65 logging.basicConfig(
66 format="%(asctime)s [%(levelname)s | %(name)s] %(message)s",
67 level=logging.DEBUG if cfg.verbose else logging.WARNING,
68 )
69 cfg.run()
70
71 @property
72 def output_raw_tshark(self) -> Path | Literal[True] | None:
73 if self.tshark_out is None:
74 return None
75 if self.tshark_out in get_args(TrueValueType):
76 return True
77 assert self.tshark_out
78 return Path(self.tshark_out) # type: ignore
79
[docs]
80 def run(self, **json_dump_kws: Any) -> None:
81 try:
82 pcapng_to_har(
83 self.input,
84 self.output,
85 tshark=Tshark(self.tshark),
86 output_raw_tshark=self.output_raw_tshark,
87 socket_operations_file=self.socket_operations_file,
88 cryptography_operations_file=self.cryptography_operations_file,
89 overwrite=self.force,
90 systematic_time_shift=self.time_shift,
91 **json_dump_kws,
92 )
93 except Exception as e:
94 raise RuntimeError(self.input.resolve()) from e
95
96
[docs]
97def pcapng_to_har(
98 input_file: Path,
99 output_file: Path | None = None,
100 *,
101 tshark: Tshark | None = None,
102 output_raw_tshark: Path | Literal[True] | None = None,
103 socket_operations_file: Path | None = None,
104 cryptography_operations_file: Path | None = None,
105 overwrite: bool = False,
106 systematic_time_shift: float = 0.0, # for stacktrace enrichment only
107 **json_dump_kws: Any,
108) -> None:
109 """Convert .pcapng file to .har file using tshark"""
110 logger = logging.getLogger("pcapng_to_har")
111 if output_file is None:
112 output_file = input_file.with_suffix(".har")
113
114 if output_raw_tshark is True:
115 output_raw_tshark = output_file.with_suffix(".json")
116
117 assert len({input_file, output_file, output_raw_tshark}) == 3, input_file.resolve()
118 if not overwrite: # fail fast
119 if output_raw_tshark is not None and output_raw_tshark.exists():
120 raise FileExistsError(output_raw_tshark)
121 if output_file.exists():
122 raise FileExistsError(output_file)
123
124 if tshark is None:
125 tshark = Tshark() # default executable path
126
127 # Load & parse the traffic from the PCAPNG file
128 tshark_out = tshark.load_traffic(input_file)
129 logger.debug(f"Successfully run tshark: metadata={tshark_out.metadata}")
130 if output_raw_tshark:
131 with output_raw_tshark.open("w" if overwrite else "x") as fp:
132 json.dump(tshark_out.list_packets, fp, indent=2, ensure_ascii=False)
133 logger.info(f"Successfully wrote tshark raw output in {output_raw_tshark}")
134
135 traffic = NetworkTrafficDump(tshark_out)
136 traffic.parse_traffic()
137
138 # Get the output HAR data (without enrichment)
139 har_data = traffic.to_har()
140
141 # Add stacktrace information to the HAR
142 enrich_har_with_io(
143 har_data,
144 Stacktrace,
145 input_file.parent,
146 socket_operations_file,
147 "socket_trace.json",
148 logger,
149 systematic_time_shift=systematic_time_shift,
150 )
151
152 # Add content decryption to the HAR
153 enrich_har_with_io(
154 har_data,
155 ContentDecryption,
156 input_file.parent,
157 cryptography_operations_file,
158 "aes_info.json",
159 logger,
160 )
161
162 # Save the enriched HAR data
163 json_dump_kws = {"indent": 2, "ensure_ascii": True, "allow_nan": False} | json_dump_kws
164 with output_file.open("w" if overwrite else "x") as f:
165 json.dump(har_data, f, **json_dump_kws)
166
167 logger.info(f"The HAR has been saved in {output_file}")
168
169
[docs]
170def enrich_har_with_io(
171 har_data: dict[str, Any],
172 enricher: type[HarEnrichment],
173 input_dir: Path,
174 input_enrichment_file: Path | str | None,
175 default_enrichment_file: str,
176 logger: logging.Logger,
177 **enrich_params: Any,
178) -> bool:
179
180 if input_enrichment_file is None: # use default Pirogue path
181 input_enrichment_file = input_dir / default_enrichment_file
182 if not input_enrichment_file.is_file():
183 return False
184 else:
185 input_enrichment_file = Path(input_enrichment_file)
186
187 has_been_enriched = enricher(har_data, input_enrichment_file, **enrich_params).enrich()
188 logger.info(f"The HAR has been enriched with {enricher.ID} data from {input_enrichment_file}")
189
190 return has_been_enriched
191
192
193if __name__ == "__main__":
194 PcapngToHar.cli()