Source code for pcapng_utils.har.pirogue_enrichment.stacktrace

  1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
  2# SPDX-License-Identifier: MIT
  3
  4import logging
  5import json
  6from pathlib import Path
  7from operator import itemgetter
  8from collections import defaultdict
  9from collections.abc import Mapping
 10from dataclasses import dataclass
 11from typing import TypedDict, Literal, ClassVar, Any
 12
 13import communityid
 14from sortedcontainers import SortedKeyList
 15
 16from .base import HarEnrichment
 17from .types import CommunityID, Timestamp, FlowDirection
 18from .utils import keys_to_camel_case, clean_prefixed_ip_address
 19from .sorted_list import get_closest_in_window
 20
 21
 22logger = logging.getLogger("pcapng_utils.pirogue_enrichment.stacktrace")
 23
 24
[docs] 25class SocketTraceData(TypedDict): 26 timestamp: Timestamp # seconds 27 process: str 28 pid: int 29 stack: list[dict] 30 socketEventType: str 31 localIp: str 32 localPort: int 33 destIp: str 34 destPort: int 35 socketType: Literal['tcp', 'tcp6', 'udp', 'udp6'] 36 communityId: CommunityID
37 38
[docs] 39def empty_time_sorted_list_of_stack_traces(): 40 return SortedKeyList(key=itemgetter('timestamp'))
41 42
[docs] 43@dataclass(frozen=True) 44class HAREntryMetadata: 45 community_id: CommunityID 46 direction: FlowDirection 47 timestamp: Timestamp 48 entry_id: str # `_sha1Id` field 49 is_http2: bool
50 51
[docs] 52def compute_time_shift_from_pirogue_experiment_metadata(pirogue_dir: Path) -> float | None: 53 experiment_path = pirogue_dir / "experiment.json" 54 if not experiment_path.is_file(): 55 return None 56 meta = json.loads(experiment_path.read_text()) 57 return (meta["device"]["start_capture_time"] - meta["network"]["start_capture_time"]) / 1000.0 # in seconds
58 59
[docs] 60class Stacktrace(HarEnrichment): 61 62 ID: ClassVar = "pirogue_stacktrace" 63 64 COMMUNITY_ID: ClassVar = communityid.CommunityID() 65 66 KEYS_PREFIX: ClassVar[str] = '' 67 68 DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = { 69 # redundant 70 'communityId', 71 'destIp', 72 'destPort', 73 'localIp', 74 'localPort', 75 } 76 77 def __init__( 78 self, 79 har_data: dict, 80 input_data_file: Path, 81 *, 82 systematic_time_shift: float | None = None, 83 time_window_requests: tuple[float, float] = (-5.0, 2.0), 84 time_window_responses: tuple[float, float] = (-2.0, 5.0), 85 ) -> None: 86 super().__init__(har_data, input_data_file) 87 self.socket_traces_map: Mapping[tuple[CommunityID, FlowDirection], SortedKeyList] = defaultdict(empty_time_sorted_list_of_stack_traces) 88 self.paired_socket_traces: dict[tuple[CommunityID, FlowDirection, int], HAREntryMetadata] = {} 89 90 self.time_windows: dict[FlowDirection, tuple[float, float]] = { 91 'out': time_window_requests, 92 'in': time_window_responses, 93 } 94 """ 95 Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction). 96 97 - For outbound network traffic, the socket operation shall be in the past, or a very very close future, 98 - For inbound network traffic, it is the opposite. 99 """ 100 101 if systematic_time_shift is None: 102 systematic_time_shift = compute_time_shift_from_pirogue_experiment_metadata(input_data_file.parent) 103 self.systematic_time_shift = systematic_time_shift or 0.0 104 """ 105 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps. 106 107 Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come 108 from Pirogue date, which may be desynchronized. 109 110 Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone). 111 """ 112 113 if self.can_enrich: 114 self._preprocess_socket_traces() 115 116 @classmethod 117 def _attach_community_id_to_stacktrace(cls, socket_trace_data: dict) -> None: 118 """Compute and append in-place the Community ID to the given stacktrace""" 119 src_ip = clean_prefixed_ip_address(socket_trace_data['localIp']) 120 src_port = socket_trace_data['localPort'] 121 dst_ip = clean_prefixed_ip_address(socket_trace_data['destIp']) 122 dst_port = socket_trace_data['destPort'] 123 # Prepare the Community ID template based on the protocol 124 if 'tcp' in socket_trace_data['socketType']: 125 tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dst_port) 126 else: 127 tpl = communityid.FlowTuple.make_udp(src_ip, dst_ip, src_port, dst_port) 128 # Attach the Community ID 129 socket_trace_data['communityId'] = cls.COMMUNITY_ID.calc(tpl) 130 131 @classmethod 132 def _get_clean_stacktrace(cls, stacktrace: dict) -> SocketTraceData: 133 """ 134 Get a clean stacktrace object by removing unnecessary fields, 135 renaming keys in camel case (with optional prefix) and ensuring the 136 timestamp is in seconds (instead of milliseconds). 137 138 Side-effects free. 139 """ 140 clean_trace_data = keys_to_camel_case({ 141 'timestamp': stacktrace['timestamp'] / 1000., 142 'process': stacktrace['process'], 143 **stacktrace['data'], 144 }, prefix=cls.KEYS_PREFIX) 145 cls._attach_community_id_to_stacktrace(clean_trace_data) 146 return clean_trace_data # type: ignore[return-value] 147 148 def _preprocess_socket_traces(self) -> None: 149 """Create the mapping of socket traces (by community ID + flow direction) to efficiently attach them afterwards.""" 150 assert isinstance(self.input_data, list), type(self.input_data) 151 for raw_stack_trace in self.input_data: 152 clean_stack_trace = self._get_clean_stacktrace(raw_stack_trace) 153 socket_type = clean_stack_trace['socketEventType'] 154 # Use read operations on the socket when dealing with a response (in), write operations otherwise (out) 155 flow_dir: FlowDirection | None = 'out' if socket_type in {'write', 'sendto'} else 'in' if socket_type in {'read', 'recvfrom'} else None 156 if flow_dir is None: 157 continue 158 # TODO: check that timestamp != of others? 159 self.socket_traces_map[(clean_stack_trace['communityId'], flow_dir)].add(clean_stack_trace) 160 161 def _find_best_stacktrace(self, har: HAREntryMetadata) -> SocketTraceData | None: 162 r""" 163 Find the stacktrace with the closest\* timestamp to the given one matching the community ID 164 165 \* (in the past if direction is `out`, in the future if direction was `in`) 166 """ 167 matching_traces = self.socket_traces_map.get((har.community_id, har.direction)) 168 if not matching_traces: 169 logger.warning(f'No socket operation has been found for {har}') 170 return None 171 har_timestamp = har.timestamp + self.systematic_time_shift 172 if (closest := get_closest_in_window(matching_traces, har_timestamp, self.time_windows[har.direction])) is None: 173 socket_chronology = 'just before' if har.direction == 'out' else 'just after' 174 logger.warning(f'No socket operation has been found {socket_chronology} {har}') 175 return None 176 closest_socket_data: SocketTraceData 177 closest_socket_ix, closest_socket_timestamp, closest_socket_data = closest 178 current_delta_sec = har_timestamp - closest_socket_timestamp 179 pairing_key = (har.community_id, har.direction, closest_socket_ix) 180 already_paired_har = self.paired_socket_traces.get(pairing_key) 181 if already_paired_har is not None: 182 if already_paired_har.timestamp == har.timestamp: 183 # OK: multiple HTTP2 streams in 1 network frame (and thus 1 socket call) 184 # It may also VERY RARELY happen in HTTP1 so we do NOT raise 185 assert already_paired_har.is_http2 is har.is_http2, (har, already_paired_har) # consistent HTTP version 186 if not har.is_http2: 187 logger.warning(f"{har} getting paired with already paired {already_paired_har}") 188 else: 189 # we could raise but this happens in real life... 190 # TODO? find best OVERALL allocations of socket operations instead of FIFO? 191 logger.warning( 192 f'Pairing {har} with socket operation @ {closest_socket_timestamp:.3f}, ' 193 f'but it is also paired with {already_paired_har}...' 194 ) 195 self.paired_socket_traces[pairing_key] = har 196 logger.debug(f'Stacktrace found with ∆t={current_delta_sec * 1000:.1f}ms for {har}') 197 return closest_socket_data 198 199 @staticmethod 200 def _compact_stack_trace(stack_trace: SocketTraceData) -> list[str] | None: 201 """Compact the stacktrace for convenience""" 202 if 'stack' not in stack_trace: # this happens... 203 return None 204 # order of dictionary keys is officially guaranteed since Python >= 3.7 205 return list({call['class']: 0 for call in stack_trace['stack']}) 206 207 def _enrich_directed_entry( 208 self, har_entry: dict[str, Any], community_id: CommunityID, direction: FlowDirection, *, har_entry_id: str 209 ) -> None: 210 """Attach the stacktrace to the given HAR directed entry (either request or response), in-place""" 211 # Fail first 212 if direction not in ('in', 'out'): 213 raise ValueError(f'Invalid communication direction: {direction}') 214 # <!> we always expect the `har_entry` to have out-of-specs `_timestamp: float` key 215 # but it may be None (missing response) 216 if har_entry['_timestamp'] is None: 217 return 218 stack_trace = self._find_best_stacktrace( 219 HAREntryMetadata( 220 community_id, 221 direction, 222 Timestamp(har_entry['_timestamp']), 223 # useful metadata when debugging 224 har_entry_id, 225 har_entry['httpVersion'] == 'HTTP/2', 226 ) 227 ) 228 # Attach the stacktrace to the HAR entry if found 229 if stack_trace: 230 har_entry['_stacktrace'] = {'stack': None} | { 231 k: v for k, v in stack_trace.items() if k not in self.DO_NOT_EXPORT_STACKTRACE_KEYS 232 } | {'compact': self._compact_stack_trace(stack_trace)} 233
[docs] 234 def enrich_entry(self, har_entry: dict[str, Any]) -> None: 235 """Enrich the HAR data with the stacktraces information""" 236 # <!> we expect our out-of-specs fields: _communityId & _sha1Id & _timestamp 237 community_id = har_entry['_communityId'] 238 har_entry_id = har_entry['_sha1Id'] 239 self._enrich_directed_entry(har_entry['request'], community_id, direction='out', har_entry_id=har_entry_id) 240 self._enrich_directed_entry(har_entry['response'], community_id, direction='in', har_entry_id=har_entry_id)