Source code for pcapng_utils.pcapng_to_har

  1#!/usr/bin/env python3
  2import json
  3import logging
  4from dataclasses import dataclass, field, KW_ONLY
  5from pathlib import Path
  6from typing import Any, Annotated, Literal, get_args
  7
  8import tyro
  9
 10from pcapng_utils.tshark import Tshark, NetworkTrafficDump
 11from pcapng_utils.har.pirogue_enrichment import HarEnrichment, Stacktrace, ContentDecryption
 12
 13
 14TrueValueType = Literal[1, "1", True, "true", "True", "TRUE"]
 15
 16PIROGUE_SOCKET_TRACES_FNAME = "socket_trace.json"
 17PIROGUE_CRYPTO_TRACES_FNAME = "aes_info.json"
 18
 19EmptyStr = Literal['']
 20
 21
[docs] 22@dataclass(frozen=True) 23class PcapngToHar: 24 """CLI script for converting .pcapng file to .har file using tshark""" 25 26 input: Annotated[Path, tyro.conf.arg(aliases=("-i",))] 27 """Path to input .pcapng""" 28 29 output: Annotated[Path | None, tyro.conf.arg(aliases=("-o",), metavar="PATH")] = None 30 """Path to output .har, INPUT.har if unset""" 31 32 _: KW_ONLY 33 34 tshark_out: Annotated[TrueValueType | str | None, tyro.conf.arg(aliases=("-ot",), metavar="PATH|1")] = None 35 """Path to raw tshark output as .json optional, if `ot=1` -> OUTPUT.json""" 36 37 tshark: Tshark = field(default_factory=Tshark) 38 """Configuration for tshark wrapper""" 39 40 ensure_ascii: bool = False 41 """Whether to escape non-ASCII symbols in HAR output""" 42 43 # Arguments for enriching the HAR data 44 45 time_shift: Annotated[float | None, tyro.conf.arg(metavar="SECONDS")] = None 46 """ 47 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps. 48 Positive means network traffic timestamps (Pirogue date) were earlier than socket operations timestamps (phone date). 49 50 When keeping default and Pirogue INPUT_DIR/experiment.json file is present under same directory than input .pcapng, 51 this time-shift will be deduced from the recorded difference between device and network `start_capture_time` 52 """ 53 54 socket_operations_file: Annotated[EmptyStr | Path | None, tyro.conf.arg(aliases=("-sf",), metavar="PATH")] = None 55 """Path to the socket operations data file generated by Pirogue, INPUT_DIR/socket_trace.json if unset""" 56 57 cryptography_operations_file: Annotated[EmptyStr | Path | None, tyro.conf.arg(aliases=("-cf",), metavar="PATH")] = None 58 """Path to the cryptography data file generated by Pirogue, INPUT_DIR/aes_info.json if unset""" 59 60 force: Annotated[bool, tyro.conf.arg(aliases=("-f",))] = False 61 """Whether to overwrite output if it exists""" 62 63 verbose: Annotated[int, tyro.conf.UseCounterAction, tyro.conf.arg(aliases=("-v",))] = 0 64 """Increase logging verbosity""" 65
[docs] 66 def configure_logging(self) -> None: 67 third_party_level = max(logging.DEBUG, logging.WARNING - 7 * self.verbose) 68 self_level = max(logging.DEBUG, logging.WARNING - 10 * self.verbose) 69 logging.basicConfig( 70 format="%(asctime)s [%(levelname)s | %(name)s] %(message)s", 71 level=third_party_level, 72 ) 73 logging.getLogger("pcapng_utils").setLevel(self_level) 74 logging.getLogger("pcapng_utils.pirogue_enrichment").setLevel(third_party_level + 1) # too verbose 75 logging.getLogger("communityid").setLevel(max(logging.DEBUG, logging.WARNING - 3 * self.verbose)) # too verbose
76
[docs] 77 @classmethod 78 def cli(cls) -> None: 79 cfg = tyro.cli( 80 cls, 81 config=(tyro.conf.FlagCreatePairsOff, tyro.conf.OmitArgPrefixes, tyro.conf.DisallowNone) 82 ) 83 cfg.configure_logging() 84 cfg.run()
85 86 @property 87 def output_raw_tshark(self) -> Path | Literal[True] | None: 88 if self.tshark_out is None: 89 return None 90 if self.tshark_out in get_args(TrueValueType): 91 return True 92 assert self.tshark_out 93 return Path(self.tshark_out) # type: ignore 94
[docs] 95 def run(self, **json_dump_kws: Any) -> None: 96 try: 97 pcapng_to_har( 98 self.input, 99 self.output, 100 tshark=self.tshark, 101 output_raw_tshark=self.output_raw_tshark, 102 socket_operations_file=self.socket_operations_file, 103 cryptography_operations_file=self.cryptography_operations_file, 104 overwrite=self.force, 105 systematic_time_shift=self.time_shift, 106 ensure_ascii=self.ensure_ascii, 107 **json_dump_kws, 108 ) 109 except Exception as e: 110 raise RuntimeError(self.input.resolve().as_posix()) from e
111 112
[docs] 113def pcapng_to_har( 114 input_file: Path, 115 output_file: Path | None = None, 116 *, 117 tshark: Tshark | None = None, 118 output_raw_tshark: Path | Literal[True] | None = None, 119 socket_operations_file: Path | str | Literal[False] | None = None, 120 cryptography_operations_file: Path | str | Literal[False] | None = None, 121 overwrite: bool = False, 122 systematic_time_shift: float | None = None, # for stacktrace enrichment only 123 **json_dump_kws: Any, 124) -> None: 125 """Convert .pcapng file to .har file using tshark""" 126 logger = logging.getLogger("pcapng_utils.pcapng_to_har") 127 if output_file is None: 128 output_file = input_file.with_suffix(".har") 129 130 if output_raw_tshark is True: 131 output_raw_tshark = output_file.with_suffix(".json") 132 133 assert len({input_file, output_file, output_raw_tshark}) == 3, input_file.resolve() 134 if not overwrite: # fail fast 135 if output_raw_tshark is not None and output_raw_tshark.exists(): 136 raise FileExistsError(output_raw_tshark) 137 if output_file.exists(): 138 raise FileExistsError(output_file) 139 140 if tshark is None: 141 tshark = Tshark() # default executable path 142 143 # Load & parse the traffic from the PCAPNG file 144 tshark_out = tshark.load_traffic(input_file) 145 logger.info(f"Successfully run tshark on {input_file} -> metadata={tshark_out.metadata}") 146 if output_raw_tshark: 147 with output_raw_tshark.open("w" if overwrite else "x") as fp: 148 json.dump(tshark_out.list_packets, fp, indent=2, ensure_ascii=False) 149 logger.info(f"Successfully wrote tshark raw output in {output_raw_tshark}") 150 151 traffic = NetworkTrafficDump(tshark_out) 152 traffic.parse_traffic() 153 154 # Get the output HAR data (without enrichment) 155 har_data = traffic.to_har() 156 157 # Add stacktrace information to the HAR 158 enrich_har_with_io( 159 har_data, 160 Stacktrace, 161 input_file.parent, 162 socket_operations_file, 163 PIROGUE_SOCKET_TRACES_FNAME, 164 logger, 165 systematic_time_shift=systematic_time_shift, 166 ) 167 168 # Add content decryption to the HAR 169 enrich_har_with_io( 170 har_data, 171 ContentDecryption, 172 input_file.parent, 173 cryptography_operations_file, 174 PIROGUE_CRYPTO_TRACES_FNAME, 175 logger, 176 ) 177 178 # Save the enriched HAR data 179 json_dump_kws = {"indent": 2, "ensure_ascii": False, "allow_nan": False} | json_dump_kws 180 with output_file.open("w" if overwrite else "x") as f: 181 json.dump(har_data, f, **json_dump_kws) 182 183 logger.info(f"The HAR has been saved in {output_file}")
184 185
[docs] 186def enrich_har_with_io( 187 har_data: dict[str, Any], 188 enricher: type[HarEnrichment], 189 input_dir: Path, 190 input_enrichment_file: Path | str | Literal[False] | None, 191 default_enrichment_filename: str, 192 logger: logging.Logger, 193 **enrich_params: Any, 194) -> bool: 195 196 if input_enrichment_file is None: # use default Pirogue path 197 input_enrichment_file = input_dir / default_enrichment_filename 198 if not input_enrichment_file.is_file(): 199 return False 200 elif not input_enrichment_file: # False or empty string 201 return False 202 else: 203 input_enrichment_file = Path(input_enrichment_file) 204 205 has_been_enriched = enricher(har_data, input_enrichment_file, **enrich_params).enrich() 206 logger.info(f"The HAR has been enriched with {enricher.ID} data from {input_enrichment_file}") 207 208 return has_been_enriched
209 210 211if __name__ == "__main__": 212 PcapngToHar.cli()