1#!/usr/bin/env python3
2import json
3import logging
4from dataclasses import dataclass, field, KW_ONLY
5from pathlib import Path
6from typing import Any, Annotated, Literal, get_args
7
8import tyro
9
10from pcapng_utils.tshark import Tshark, NetworkTrafficDump
11from pcapng_utils.har.pirogue_enrichment import HarEnrichment, Stacktrace, ContentDecryption
12
13
14TrueValueType = Literal[1, "1", True, "true", "True", "TRUE"]
15
16PIROGUE_SOCKET_TRACES_FNAME = "socket_trace.json"
17PIROGUE_CRYPTO_TRACES_FNAME = "aes_info.json"
18
19EmptyStr = Literal['']
20
21
[docs]
22@dataclass(frozen=True)
23class PcapngToHar:
24 """CLI script for converting .pcapng file to .har file using tshark"""
25
26 input: Annotated[Path, tyro.conf.arg(aliases=("-i",))]
27 """Path to input .pcapng"""
28
29 output: Annotated[Path | None, tyro.conf.arg(aliases=("-o",), metavar="PATH")] = None
30 """Path to output .har, INPUT.har if unset"""
31
32 _: KW_ONLY
33
34 tshark_out: Annotated[TrueValueType | str | None, tyro.conf.arg(aliases=("-ot",), metavar="PATH|1")] = None
35 """Path to raw tshark output as .json optional, if `ot=1` -> OUTPUT.json"""
36
37 tshark: Tshark = field(default_factory=Tshark)
38 """Configuration for tshark wrapper"""
39
40 ensure_ascii: bool = False
41 """Whether to escape non-ASCII symbols in HAR output"""
42
43 # Arguments for enriching the HAR data
44
45 time_shift: Annotated[float | None, tyro.conf.arg(metavar="SECONDS")] = None
46 """
47 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.
48 Positive means network traffic timestamps (Pirogue date) were earlier than socket operations timestamps (phone date).
49
50 When keeping default and Pirogue INPUT_DIR/experiment.json file is present under same directory than input .pcapng,
51 this time-shift will be deduced from the recorded difference between device and network `start_capture_time`
52 """
53
54 socket_operations_file: Annotated[EmptyStr | Path | None, tyro.conf.arg(aliases=("-sf",), metavar="PATH")] = None
55 """Path to the socket operations data file generated by Pirogue, INPUT_DIR/socket_trace.json if unset"""
56
57 cryptography_operations_file: Annotated[EmptyStr | Path | None, tyro.conf.arg(aliases=("-cf",), metavar="PATH")] = None
58 """Path to the cryptography data file generated by Pirogue, INPUT_DIR/aes_info.json if unset"""
59
60 force: Annotated[bool, tyro.conf.arg(aliases=("-f",))] = False
61 """Whether to overwrite output if it exists"""
62
63 verbose: Annotated[int, tyro.conf.UseCounterAction, tyro.conf.arg(aliases=("-v",))] = 0
64 """Increase logging verbosity"""
65
76
[docs]
77 @classmethod
78 def cli(cls) -> None:
79 cfg = tyro.cli(
80 cls,
81 config=(tyro.conf.FlagCreatePairsOff, tyro.conf.OmitArgPrefixes, tyro.conf.DisallowNone)
82 )
83 cfg.configure_logging()
84 cfg.run()
85
86 @property
87 def output_raw_tshark(self) -> Path | Literal[True] | None:
88 if self.tshark_out is None:
89 return None
90 if self.tshark_out in get_args(TrueValueType):
91 return True
92 assert self.tshark_out
93 return Path(self.tshark_out) # type: ignore
94
[docs]
95 def run(self, **json_dump_kws: Any) -> None:
96 try:
97 pcapng_to_har(
98 self.input,
99 self.output,
100 tshark=self.tshark,
101 output_raw_tshark=self.output_raw_tshark,
102 socket_operations_file=self.socket_operations_file,
103 cryptography_operations_file=self.cryptography_operations_file,
104 overwrite=self.force,
105 systematic_time_shift=self.time_shift,
106 ensure_ascii=self.ensure_ascii,
107 **json_dump_kws,
108 )
109 except Exception as e:
110 raise RuntimeError(self.input.resolve().as_posix()) from e
111
112
[docs]
113def pcapng_to_har(
114 input_file: Path,
115 output_file: Path | None = None,
116 *,
117 tshark: Tshark | None = None,
118 output_raw_tshark: Path | Literal[True] | None = None,
119 socket_operations_file: Path | str | Literal[False] | None = None,
120 cryptography_operations_file: Path | str | Literal[False] | None = None,
121 overwrite: bool = False,
122 systematic_time_shift: float | None = None, # for stacktrace enrichment only
123 **json_dump_kws: Any,
124) -> None:
125 """Convert .pcapng file to .har file using tshark"""
126 logger = logging.getLogger("pcapng_utils.pcapng_to_har")
127 if output_file is None:
128 output_file = input_file.with_suffix(".har")
129
130 if output_raw_tshark is True:
131 output_raw_tshark = output_file.with_suffix(".json")
132
133 assert len({input_file, output_file, output_raw_tshark}) == 3, input_file.resolve()
134 if not overwrite: # fail fast
135 if output_raw_tshark is not None and output_raw_tshark.exists():
136 raise FileExistsError(output_raw_tshark)
137 if output_file.exists():
138 raise FileExistsError(output_file)
139
140 if tshark is None:
141 tshark = Tshark() # default executable path
142
143 # Load & parse the traffic from the PCAPNG file
144 tshark_out = tshark.load_traffic(input_file)
145 logger.info(f"Successfully run tshark on {input_file} -> metadata={tshark_out.metadata}")
146 if output_raw_tshark:
147 with output_raw_tshark.open("w" if overwrite else "x") as fp:
148 json.dump(tshark_out.list_packets, fp, indent=2, ensure_ascii=False)
149 logger.info(f"Successfully wrote tshark raw output in {output_raw_tshark}")
150
151 traffic = NetworkTrafficDump(tshark_out)
152 traffic.parse_traffic()
153
154 # Get the output HAR data (without enrichment)
155 har_data = traffic.to_har()
156
157 # Add stacktrace information to the HAR
158 enrich_har_with_io(
159 har_data,
160 Stacktrace,
161 input_file.parent,
162 socket_operations_file,
163 PIROGUE_SOCKET_TRACES_FNAME,
164 logger,
165 systematic_time_shift=systematic_time_shift,
166 )
167
168 # Add content decryption to the HAR
169 enrich_har_with_io(
170 har_data,
171 ContentDecryption,
172 input_file.parent,
173 cryptography_operations_file,
174 PIROGUE_CRYPTO_TRACES_FNAME,
175 logger,
176 )
177
178 # Save the enriched HAR data
179 json_dump_kws = {"indent": 2, "ensure_ascii": False, "allow_nan": False} | json_dump_kws
180 with output_file.open("w" if overwrite else "x") as f:
181 json.dump(har_data, f, **json_dump_kws)
182
183 logger.info(f"The HAR has been saved in {output_file}")
184
185
[docs]
186def enrich_har_with_io(
187 har_data: dict[str, Any],
188 enricher: type[HarEnrichment],
189 input_dir: Path,
190 input_enrichment_file: Path | str | Literal[False] | None,
191 default_enrichment_filename: str,
192 logger: logging.Logger,
193 **enrich_params: Any,
194) -> bool:
195
196 if input_enrichment_file is None: # use default Pirogue path
197 input_enrichment_file = input_dir / default_enrichment_filename
198 if not input_enrichment_file.is_file():
199 return False
200 elif not input_enrichment_file: # False or empty string
201 return False
202 else:
203 input_enrichment_file = Path(input_enrichment_file)
204
205 has_been_enriched = enricher(har_data, input_enrichment_file, **enrich_params).enrich()
206 logger.info(f"The HAR has been enriched with {enricher.ID} data from {input_enrichment_file}")
207
208 return has_been_enriched
209
210
211if __name__ == "__main__":
212 PcapngToHar.cli()