Source code for pcapng_utils.har.pirogue_enrichment.stacktrace

  1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
  2# SPDX-License-Identifier: MIT
  3
  4import logging
  5from pathlib import Path
  6from operator import itemgetter
  7from collections import defaultdict
  8from collections.abc import Mapping
  9from dataclasses import dataclass
 10from typing import TypedDict, Literal, ClassVar, Any
 11
 12import communityid
 13from sortedcontainers import SortedKeyList
 14
 15from .base import HarEnrichment
 16from .types import CommunityID, Timestamp, FlowDirection
 17from .utils import keys_to_camel_case, clean_prefixed_ip_address
 18from .sorted_list import get_closest_in_window
 19
 20logger = logging.getLogger('enrichment')
 21
 22
[docs] 23class SocketTraceData(TypedDict): 24 timestamp: Timestamp # seconds 25 process: str 26 pid: int 27 stack: list[dict] 28 socketEventType: str 29 localIp: str 30 localPort: int 31 destIp: str 32 destPort: int 33 socketType: Literal['tcp', 'tcp6', 'udp', 'udp6'] 34 communityId: CommunityID
35 36
[docs] 37def empty_time_sorted_list_of_stack_traces(): 38 return SortedKeyList(key=itemgetter('timestamp'))
39 40
[docs] 41@dataclass(frozen=True) 42class HAREntryMetadata: 43 community_id: CommunityID 44 direction: FlowDirection 45 timestamp: Timestamp 46 entry_id: str # `_sha1Id` field 47 is_http2: bool
48 49
[docs] 50class Stacktrace(HarEnrichment): 51 52 ID: ClassVar = 'stacktrace' 53 54 COMMUNITY_ID: ClassVar = communityid.CommunityID() 55 56 KEYS_PREFIX: ClassVar[str] = '' 57 58 DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = { 59 # redundant 60 'communityId', 61 'destIp', 62 'destPort', 63 'localIp', 64 'localPort', 65 } 66 67 def __init__( 68 self, 69 har_data: dict, 70 input_data_file: Path, 71 *, 72 systematic_time_shift: float = 0.0, 73 time_window_requests: tuple[float, float] = (-5.0, 2.0), 74 time_window_responses: tuple[float, float] = (-2.0, 5.0), 75 ) -> None: 76 super().__init__(har_data, input_data_file) 77 self.socket_traces_map: Mapping[tuple[CommunityID, FlowDirection], SortedKeyList] = defaultdict(empty_time_sorted_list_of_stack_traces) 78 self.paired_socket_traces: dict[tuple[CommunityID, FlowDirection, int], HAREntryMetadata] = {} 79 80 self.time_windows: dict[FlowDirection, tuple[float, float]] = { 81 'out': time_window_requests, 82 'in': time_window_responses, 83 } 84 """ 85 Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction). 86 87 - For outbound network traffic, the socket operation shall be in the past, or a very very close future, 88 - For inbound network traffic, it is the opposite. 89 """ 90 91 self.systematic_time_shift = systematic_time_shift 92 """ 93 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps. 94 95 Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come 96 from Pirogue date, which may be desynchronized. 97 98 Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone). 99 """ 100 101 if self.can_enrich: 102 self._preprocess_socket_traces() 103 104 @classmethod 105 def _attach_community_id_to_stacktrace(cls, socket_trace_data: dict) -> None: 106 """Compute and append in-place the Community ID to the given stacktrace""" 107 src_ip = clean_prefixed_ip_address(socket_trace_data['localIp']) 108 src_port = socket_trace_data['localPort'] 109 dst_ip = clean_prefixed_ip_address(socket_trace_data['destIp']) 110 dst_port = socket_trace_data['destPort'] 111 # Prepare the Community ID template based on the protocol 112 if 'tcp' in socket_trace_data['socketType']: 113 tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dst_port) 114 else: 115 tpl = communityid.FlowTuple.make_udp(src_ip, dst_ip, src_port, dst_port) 116 # Attach the Community ID 117 socket_trace_data['communityId'] = cls.COMMUNITY_ID.calc(tpl) 118 119 @classmethod 120 def _get_clean_stacktrace(cls, stacktrace: dict) -> SocketTraceData: 121 """ 122 Get a clean stacktrace object by removing unnecessary fields, 123 renaming keys in camel case (with optional prefix) and ensuring the 124 timestamp is in seconds (instead of milliseconds). 125 126 Side-effects free. 127 """ 128 clean_trace_data = keys_to_camel_case({ 129 'timestamp': stacktrace['timestamp'] / 1000., 130 'process': stacktrace['process'], 131 **stacktrace['data'], 132 }, prefix=cls.KEYS_PREFIX) 133 cls._attach_community_id_to_stacktrace(clean_trace_data) 134 return clean_trace_data # type: ignore[return-value] 135 136 def _preprocess_socket_traces(self) -> None: 137 """Create the mapping of socket traces (by community ID + flow direction) to efficiently attach them afterwards.""" 138 assert isinstance(self.input_data, list), type(self.input_data) 139 for raw_stack_trace in self.input_data: 140 clean_stack_trace = self._get_clean_stacktrace(raw_stack_trace) 141 socket_type = clean_stack_trace['socketEventType'] 142 # Use read operations on the socket when dealing with a response (in), write operations otherwise (out) 143 flow_dir: FlowDirection | None = 'out' if socket_type in {'write', 'sendto'} else 'in' if socket_type in {'read', 'recvfrom'} else None 144 if flow_dir is None: 145 continue 146 # TODO: check that timestamp != of others? 147 self.socket_traces_map[(clean_stack_trace['communityId'], flow_dir)].add(clean_stack_trace) 148 149 def _find_best_stacktrace(self, har: HAREntryMetadata) -> SocketTraceData | None: 150 r""" 151 Find the stacktrace with the closest\* timestamp to the given one matching the community ID 152 153 \* (in the past if direction is `out`, in the future if direction was `in`) 154 """ 155 matching_traces = self.socket_traces_map.get((har.community_id, har.direction)) 156 if not matching_traces: 157 logger.warning(f'No socket operation has been found for {har}') 158 return None 159 har_timestamp = har.timestamp + self.systematic_time_shift 160 if (closest := get_closest_in_window(matching_traces, har_timestamp, self.time_windows[har.direction])) is None: 161 socket_chronology = 'just before' if har.direction == 'out' else 'just after' 162 logger.warning(f'No socket operation has been found {socket_chronology} {har}') 163 return None 164 closest_socket_data: SocketTraceData 165 closest_socket_ix, closest_socket_timestamp, closest_socket_data = closest 166 current_delta_sec = har_timestamp - closest_socket_timestamp 167 pairing_key = (har.community_id, har.direction, closest_socket_ix) 168 already_paired_har = self.paired_socket_traces.get(pairing_key) 169 if already_paired_har is not None: 170 if already_paired_har.timestamp == har.timestamp: 171 # OK: multiple HTTP2 streams in 1 network frame (and thus 1 socket call) 172 assert already_paired_har.is_http2 and har.is_http2, (har, already_paired_har) 173 else: 174 # we could raise but this happens in real life... 175 # TODO? find best OVERALL allocations of socket operations instead of FIFO? 176 logger.warning( 177 f'Pairing {har} with socket operation @ {closest_socket_timestamp:.3f}, ' 178 f'but it is also paired with {already_paired_har}...' 179 ) 180 self.paired_socket_traces[pairing_key] = har 181 logger.debug(f'Stacktrace found with ∆t={current_delta_sec * 1000:.1f}ms for {har}') 182 return closest_socket_data 183 184 @staticmethod 185 def _compact_stack_trace(stack_trace: SocketTraceData) -> list[str] | None: 186 """Compact the stacktrace for convenience""" 187 if 'stack' not in stack_trace: # this happens... 188 return None 189 # order of dictionary keys is officially guaranteed since Python >= 3.7 190 return list({call['class']: 0 for call in stack_trace['stack']}) 191 192 def _enrich_directed_entry( 193 self, har_entry: dict[str, Any], community_id: CommunityID, direction: FlowDirection, *, har_entry_id: str 194 ) -> None: 195 """Attach the stacktrace to the given HAR directed entry (either request or response), in-place""" 196 # Fail first 197 if direction not in ('in', 'out'): 198 raise ValueError(f'Invalid communication direction: {direction}') 199 # <!> we always expect the `har_entry` to have out-of-specs `_timestamp: float` key 200 # but it may be None (missing response) 201 if har_entry['_timestamp'] is None: 202 return 203 stack_trace = self._find_best_stacktrace( 204 HAREntryMetadata( 205 community_id, 206 direction, 207 Timestamp(har_entry['_timestamp']), 208 # useful metadata when debugging 209 har_entry_id, 210 har_entry['httpVersion'] == 'HTTP/2', 211 ) 212 ) 213 # Attach the stacktrace to the HAR entry if found 214 if stack_trace: 215 har_entry['_stacktrace'] = {'stack': None} | { 216 k: v for k, v in stack_trace.items() if k not in self.DO_NOT_EXPORT_STACKTRACE_KEYS 217 } | {'compact': self._compact_stack_trace(stack_trace)} 218
[docs] 219 def enrich_entry(self, har_entry: dict[str, Any]) -> None: 220 """Enrich the HAR data with the stacktraces information""" 221 # <!> we expect our out-of-specs fields: _communityId & _sha1Id & _timestamp 222 community_id = har_entry['_communityId'] 223 har_entry_id = har_entry['_sha1Id'] 224 self._enrich_directed_entry(har_entry['request'], community_id, direction='out', har_entry_id=har_entry_id) 225 self._enrich_directed_entry(har_entry['response'], community_id, direction='in', har_entry_id=har_entry_id)