pcapng_utils.har.pirogue_enrichment package

Submodules

pcapng_utils.har.pirogue_enrichment.base module

class pcapng_utils.har.pirogue_enrichment.base.HarEnrichment(har_data, input_data_file)[source]

Bases: ABC

enrich()[source]

Enrich, in-place, the HAR data with input-data.

Return type:

bool

abstractmethod enrich_entry(har_entry)[source]

Enrich, in-place, one entry of the HAR data with input-data.

ID: ClassVar[str]

pcapng_utils.har.pirogue_enrichment.decryption module

class pcapng_utils.har.pirogue_enrichment.decryption.ContentDecryption(har_data, input_data_file)[source]

Bases: HarEnrichment

enrich_entry(har_entry)[source]

Enrich, in-place, one entry of the HAR data with input-data.

ID: ClassVar = 'decryption'
MAX_SIZE_DIFF_FRACTION: float = 0.5
MIN_LEN_DECRYPTED_BYTES: int = 2
MIN_LEN_ENCRYPTED_BYTES: int = 8

pcapng_utils.har.pirogue_enrichment.sorted_list module

cf. https://docs.python.org/3.11/library/bisect.html#searching-sorted-lists

pcapng_utils.har.pirogue_enrichment.sorted_list.get_closest_in_window(lst, key, rel_window)[source]

Find element with closest key in provided relative window of keys (inclusive)

Return type:

tuple[int, _KT, Any] | None

pcapng_utils.har.pirogue_enrichment.sorted_list.get_gt(lst, key)[source]

Find leftmost element whose key is greater than key

Return type:

tuple[int, _KT, Any] | None

pcapng_utils.har.pirogue_enrichment.sorted_list.get_le(lst, key)[source]

Find rightmost element whose key is less than or equal to key

Return type:

tuple[int, _KT, Any] | None

pcapng_utils.har.pirogue_enrichment.stacktrace module

class pcapng_utils.har.pirogue_enrichment.stacktrace.HAREntryMetadata(community_id: str, direction: Literal['in', 'out'], timestamp: float, entry_id: str, is_http2: bool)[source]

Bases: object

community_id: str
direction: Literal['in', 'out']
entry_id: str
is_http2: bool
timestamp: float
class pcapng_utils.har.pirogue_enrichment.stacktrace.SocketTraceData[source]

Bases: TypedDict

communityId: str
destIp: str
destPort: int
localIp: str
localPort: int
pid: int
process: str
socketEventType: str
socketType: Literal['tcp', 'tcp6', 'udp', 'udp6']
stack: list[dict]
timestamp: float
class pcapng_utils.har.pirogue_enrichment.stacktrace.Stacktrace(har_data, input_data_file, *, systematic_time_shift=0.0, time_window_requests=(-5.0, 2.0), time_window_responses=(-2.0, 5.0))[source]

Bases: HarEnrichment

enrich_entry(har_entry)[source]

Enrich the HAR data with the stacktraces information

COMMUNITY_ID: ClassVar = CommunityID(v=1,seed=0,base64=True)
DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = {'communityId', 'destIp', 'destPort', 'localIp', 'localPort'}
ID: ClassVar = 'stacktrace'
KEYS_PREFIX: ClassVar[str] = ''
systematic_time_shift

Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.

Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come from Pirogue date, which may be desynchronized.

Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone).

time_windows: dict[Literal['in', 'out'], tuple[float, float]]

Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction).

  • For outbound network traffic, the socket operation shall be in the past, or a very very close future,

  • For inbound network traffic, it is the opposite.

pcapng_utils.har.pirogue_enrichment.stacktrace.empty_time_sorted_list_of_stack_traces()[source]

pcapng_utils.har.pirogue_enrichment.types module

pcapng_utils.har.pirogue_enrichment.types.Timestamp

timestamp (in seconds)

pcapng_utils.har.pirogue_enrichment.utils module

pcapng_utils.har.pirogue_enrichment.utils.clean_prefixed_ip_address(ip_address)[source]
Return type:

str

pcapng_utils.har.pirogue_enrichment.utils.keys_to_camel_case(obj, *, prefix='')[source]

Recursively rename all keys of dictionaries within object with camel case (optionally prefixed).

Return type:

_T

pcapng_utils.har.pirogue_enrichment.utils.robust_b64decode(b64_str, *, altchars=None)[source]

Robustly decode some base64 data (standard, URL-safe, fixed width with new lines, without padding, …)

Return type:

bytes

pcapng_utils.har.pirogue_enrichment.utils.to_camel_case(s)[source]
Return type:

str

Module contents

class pcapng_utils.har.pirogue_enrichment.ContentDecryption(har_data, input_data_file)[source]

Bases: HarEnrichment

enrich_entry(har_entry)[source]

Enrich, in-place, one entry of the HAR data with input-data.

ID: ClassVar = 'decryption'
MAX_SIZE_DIFF_FRACTION: float = 0.5
MIN_LEN_DECRYPTED_BYTES: int = 2
MIN_LEN_ENCRYPTED_BYTES: int = 8
can_enrich: bool
cryptography_operations: list[dict]
input_data: Any | None
input_data_hash: str | None
class pcapng_utils.har.pirogue_enrichment.HarEnrichment(har_data, input_data_file)[source]

Bases: ABC

enrich()[source]

Enrich, in-place, the HAR data with input-data.

Return type:

bool

abstractmethod enrich_entry(har_entry)[source]

Enrich, in-place, one entry of the HAR data with input-data.

ID: ClassVar[str]
can_enrich: bool
input_data: Any | None
input_data_hash: str | None
class pcapng_utils.har.pirogue_enrichment.Stacktrace(har_data, input_data_file, *, systematic_time_shift=0.0, time_window_requests=(-5.0, 2.0), time_window_responses=(-2.0, 5.0))[source]

Bases: HarEnrichment

enrich_entry(har_entry)[source]

Enrich the HAR data with the stacktraces information

COMMUNITY_ID: ClassVar = CommunityID(v=1,seed=0,base64=True)
DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = {'communityId', 'destIp', 'destPort', 'localIp', 'localPort'}
ID: ClassVar = 'stacktrace'
KEYS_PREFIX: ClassVar[str] = ''
can_enrich: bool
input_data: Any | None
input_data_hash: str | None
paired_socket_traces: dict[tuple[str, Literal['in', 'out'], int], HAREntryMetadata]
socket_traces_map: Mapping[tuple[str, Literal['in', 'out']], SortedKeyList]
systematic_time_shift

Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.

Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come from Pirogue date, which may be desynchronized.

Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone).

time_windows: dict[Literal['in', 'out'], tuple[float, float]]

Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction).

  • For outbound network traffic, the socket operation shall be in the past, or a very very close future,

  • For inbound network traffic, it is the opposite.