Source code for pcapng_utils.har.pirogue_enrichment.base

 1import os
 2import json
 3from abc import ABC, abstractmethod
 4from hashlib import file_digest
 5from pathlib import Path
 6from typing import ClassVar, Any
 7
 8HASH_ALGO = "sha1"
 9ENRICHMENT_KEY = f"enrichment_files_{HASH_ALGO}"
10
11
[docs] 12class HarEnrichment(ABC): 13 ID: ClassVar[str] 14 15 def __init__(self, har_data: dict, input_data_file: Path) -> None: 16 self.har_data = har_data 17 self.input_data_file = input_data_file 18 self.input_data_hash: str | None = None 19 self.can_enrich: bool = False 20 self.input_data: Any | None = None 21 22 if not self.har_data or input_data_file.resolve().as_posix() == os.devnull: 23 return 24 25 with self.input_data_file.open("rb") as f: 26 self.input_data_hash = file_digest(f, HASH_ALGO).hexdigest() 27 f.seek(0) # reset file stream to the beginning 28 try: 29 self.input_data = json.load(f) 30 self.can_enrich = bool(self.input_data) 31 except Exception as e: 32 raise ValueError( 33 f"Invalid input file format for {self.ID}: {input_data_file}" 34 ) from e 35
[docs] 36 def enrich(self) -> bool: 37 """Enrich, in-place, the HAR data with input-data.""" 38 if not self.can_enrich: 39 return False 40 meta: dict = self.har_data["log"]["creator"]["_metadata"] 41 meta.setdefault(ENRICHMENT_KEY, {}) 42 if self.ID in meta[ENRICHMENT_KEY]: 43 raise ValueError(f"{self.ID} enrichment already performed") 44 assert self.input_data_hash is not None 45 meta[ENRICHMENT_KEY][self.ID] = self.input_data_hash 46 for entry in self.har_data["log"]["entries"]: 47 self.enrich_entry(entry) 48 return True
49
[docs] 50 @abstractmethod 51 def enrich_entry(self, har_entry: dict[str, Any]) -> None: 52 """Enrich, in-place, one entry of the HAR data with input-data."""