Source code for pcapng_utils.pcapng_to_har

  1#!/usr/bin/env python3
  2import json
  3import logging
  4import platform
  5from dataclasses import dataclass, field, KW_ONLY
  6from pathlib import Path
  7from typing import Any, Annotated, Literal, get_args
  8
  9import tyro
 10
 11from pcapng_utils.tshark import Tshark, NetworkTrafficDump
 12from pcapng_utils.har.pirogue_enrichment import HarEnrichment, Stacktrace, ContentDecryption
 13
 14DEFAULT_TSHARK_PATH = {
 15    "Linux": "/usr/bin/tshark",
 16    "Darwin": "/Applications/Wireshark.app/Contents/MacOS/tshark",
 17}.get(platform.system())
 18
 19TrueValueType = Literal[1, "1", True, "true", "True", "TRUE"]
 20
 21
[docs] 22@dataclass(frozen=True) 23class PcapngToHar: 24 """CLI script for converting .pcapng file to .har file using tshark""" 25 26 input: Annotated[Path, tyro.conf.arg(aliases=("-i",))] 27 """Path to input .pcapng""" 28 29 output: Annotated[Path | None, tyro.conf.arg(aliases=("-o",), metavar="PATH")] = None 30 """Path to output .har, INPUT.har if unset""" 31 32 _: KW_ONLY 33 34 tshark_out: Annotated[TrueValueType | str | None, tyro.conf.arg(aliases=("-ot",), metavar="PATH|1")] = None 35 """Path to raw tshark output as .json optional, if `ot=1` -> OUTPUT.json""" 36 37 # Arguments for enriching the HAR data 38 39 time_shift: Annotated[float, tyro.conf.arg(metavar="SECONDS")] = 0.0 40 """ 41 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps. 42 Positive means network traffic timestamps (Pirogue date) were earlier than socket operations timestamps (phone date). 43 """ 44 45 socket_operations_file: Annotated[Path | None, tyro.conf.arg(aliases=("-sf",), metavar="PATH")] = None 46 """Path to the socket operations data file generated by Pirogue, INPUT_DIR/socket_trace.json if unset""" 47 48 cryptography_operations_file: Annotated[Path | None, tyro.conf.arg(aliases=("-cf",), metavar="PATH")] = None 49 """Path to the cryptography data file generated by Pirogue, INPUT_DIR/aes_info.json if unset""" 50 51 tshark: str = ( 52 field(default=DEFAULT_TSHARK_PATH) if DEFAULT_TSHARK_PATH and Path(DEFAULT_TSHARK_PATH).exists() else field() 53 ) 54 """Path/command for tshark executable""" 55 56 force: Annotated[bool, tyro.conf.arg(aliases=("-f",))] = False 57 """Whether to overwrite output if it exists""" 58 59 verbose: Annotated[bool, tyro.conf.arg(aliases=("-v",))] = False 60 """Activate verbose logging""" 61
[docs] 62 @classmethod 63 def cli(cls) -> None: 64 cfg = tyro.cli(cls, config=(tyro.conf.FlagCreatePairsOff,)) 65 logging.basicConfig( 66 format="%(asctime)s [%(levelname)s | %(name)s] %(message)s", 67 level=logging.DEBUG if cfg.verbose else logging.WARNING, 68 ) 69 cfg.run()
70 71 @property 72 def output_raw_tshark(self) -> Path | Literal[True] | None: 73 if self.tshark_out is None: 74 return None 75 if self.tshark_out in get_args(TrueValueType): 76 return True 77 assert self.tshark_out 78 return Path(self.tshark_out) # type: ignore 79
[docs] 80 def run(self, **json_dump_kws: Any) -> None: 81 try: 82 pcapng_to_har( 83 self.input, 84 self.output, 85 tshark=Tshark(self.tshark), 86 output_raw_tshark=self.output_raw_tshark, 87 socket_operations_file=self.socket_operations_file, 88 cryptography_operations_file=self.cryptography_operations_file, 89 overwrite=self.force, 90 systematic_time_shift=self.time_shift, 91 **json_dump_kws, 92 ) 93 except Exception as e: 94 raise RuntimeError(self.input.resolve()) from e
95 96
[docs] 97def pcapng_to_har( 98 input_file: Path, 99 output_file: Path | None = None, 100 *, 101 tshark: Tshark | None = None, 102 output_raw_tshark: Path | Literal[True] | None = None, 103 socket_operations_file: Path | None = None, 104 cryptography_operations_file: Path | None = None, 105 overwrite: bool = False, 106 systematic_time_shift: float = 0.0, # for stacktrace enrichment only 107 **json_dump_kws: Any, 108) -> None: 109 """Convert .pcapng file to .har file using tshark""" 110 logger = logging.getLogger("pcapng_to_har") 111 if output_file is None: 112 output_file = input_file.with_suffix(".har") 113 114 if output_raw_tshark is True: 115 output_raw_tshark = output_file.with_suffix(".json") 116 117 assert len({input_file, output_file, output_raw_tshark}) == 3, input_file.resolve() 118 if not overwrite: # fail fast 119 if output_raw_tshark is not None and output_raw_tshark.exists(): 120 raise FileExistsError(output_raw_tshark) 121 if output_file.exists(): 122 raise FileExistsError(output_file) 123 124 if tshark is None: 125 tshark = Tshark() # default executable path 126 127 # Load & parse the traffic from the PCAPNG file 128 tshark_out = tshark.load_traffic(input_file) 129 logger.debug(f"Successfully run tshark: metadata={tshark_out.metadata}") 130 if output_raw_tshark: 131 with output_raw_tshark.open("w" if overwrite else "x") as fp: 132 json.dump(tshark_out.list_packets, fp, indent=2, ensure_ascii=False) 133 logger.info(f"Successfully wrote tshark raw output in {output_raw_tshark}") 134 135 traffic = NetworkTrafficDump(tshark_out) 136 traffic.parse_traffic() 137 138 # Get the output HAR data (without enrichment) 139 har_data = traffic.to_har() 140 141 # Add stacktrace information to the HAR 142 enrich_har_with_io( 143 har_data, 144 Stacktrace, 145 input_file.parent, 146 socket_operations_file, 147 "socket_trace.json", 148 logger, 149 systematic_time_shift=systematic_time_shift, 150 ) 151 152 # Add content decryption to the HAR 153 enrich_har_with_io( 154 har_data, 155 ContentDecryption, 156 input_file.parent, 157 cryptography_operations_file, 158 "aes_info.json", 159 logger, 160 ) 161 162 # Save the enriched HAR data 163 json_dump_kws = {"indent": 2, "ensure_ascii": True, "allow_nan": False} | json_dump_kws 164 with output_file.open("w" if overwrite else "x") as f: 165 json.dump(har_data, f, **json_dump_kws) 166 167 logger.info(f"The HAR has been saved in {output_file}")
168 169
[docs] 170def enrich_har_with_io( 171 har_data: dict[str, Any], 172 enricher: type[HarEnrichment], 173 input_dir: Path, 174 input_enrichment_file: Path | str | None, 175 default_enrichment_file: str, 176 logger: logging.Logger, 177 **enrich_params: Any, 178) -> bool: 179 180 if input_enrichment_file is None: # use default Pirogue path 181 input_enrichment_file = input_dir / default_enrichment_file 182 if not input_enrichment_file.is_file(): 183 return False 184 else: 185 input_enrichment_file = Path(input_enrichment_file) 186 187 has_been_enriched = enricher(har_data, input_enrichment_file, **enrich_params).enrich() 188 logger.info(f"The HAR has been enriched with {enricher.ID} data from {input_enrichment_file}") 189 190 return has_been_enriched
191 192 193if __name__ == "__main__": 194 PcapngToHar.cli()