1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
2# SPDX-License-Identifier: MIT
3
4import logging
5import json
6from pathlib import Path
7from operator import itemgetter
8from collections import defaultdict
9from collections.abc import Mapping
10from dataclasses import dataclass
11from typing import TypedDict, Literal, ClassVar, Any
12
13import communityid
14from sortedcontainers import SortedKeyList
15
16from .base import HarEnrichment
17from .types import CommunityID, Timestamp, FlowDirection
18from .utils import keys_to_camel_case, clean_prefixed_ip_address
19from .sorted_list import get_closest_in_window
20
21
22logger = logging.getLogger("pcapng_utils.pirogue_enrichment.stacktrace")
23
24
[docs]
25class SocketTraceData(TypedDict):
26 timestamp: Timestamp # seconds
27 process: str
28 pid: int
29 stack: list[dict]
30 socketEventType: str
31 localIp: str
32 localPort: int
33 destIp: str
34 destPort: int
35 socketType: Literal['tcp', 'tcp6', 'udp', 'udp6']
36 communityId: CommunityID
37
38
[docs]
39def empty_time_sorted_list_of_stack_traces():
40 return SortedKeyList(key=itemgetter('timestamp'))
41
42
[docs]
43@dataclass(frozen=True)
44class HAREntryMetadata:
45 community_id: CommunityID
46 direction: FlowDirection
47 timestamp: Timestamp
48 entry_id: str # `_sha1Id` field
49 is_http2: bool
50
51
58
59
[docs]
60class Stacktrace(HarEnrichment):
61
62 ID: ClassVar = "pirogue_stacktrace"
63
64 COMMUNITY_ID: ClassVar = communityid.CommunityID()
65
66 KEYS_PREFIX: ClassVar[str] = ''
67
68 DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = {
69 # redundant
70 'communityId',
71 'destIp',
72 'destPort',
73 'localIp',
74 'localPort',
75 }
76
77 def __init__(
78 self,
79 har_data: dict,
80 input_data_file: Path,
81 *,
82 systematic_time_shift: float | None = None,
83 time_window_requests: tuple[float, float] = (-5.0, 2.0),
84 time_window_responses: tuple[float, float] = (-2.0, 5.0),
85 ) -> None:
86 super().__init__(har_data, input_data_file)
87 self.socket_traces_map: Mapping[tuple[CommunityID, FlowDirection], SortedKeyList] = defaultdict(empty_time_sorted_list_of_stack_traces)
88 self.paired_socket_traces: dict[tuple[CommunityID, FlowDirection, int], HAREntryMetadata] = {}
89
90 self.time_windows: dict[FlowDirection, tuple[float, float]] = {
91 'out': time_window_requests,
92 'in': time_window_responses,
93 }
94 """
95 Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction).
96
97 - For outbound network traffic, the socket operation shall be in the past, or a very very close future,
98 - For inbound network traffic, it is the opposite.
99 """
100
101 if systematic_time_shift is None:
102 systematic_time_shift = compute_time_shift_from_pirogue_experiment_metadata(input_data_file.parent)
103 self.systematic_time_shift = systematic_time_shift or 0.0
104 """
105 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.
106
107 Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come
108 from Pirogue date, which may be desynchronized.
109
110 Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone).
111 """
112
113 if self.can_enrich:
114 self._preprocess_socket_traces()
115
116 @classmethod
117 def _attach_community_id_to_stacktrace(cls, socket_trace_data: dict) -> None:
118 """Compute and append in-place the Community ID to the given stacktrace"""
119 src_ip = clean_prefixed_ip_address(socket_trace_data['localIp'])
120 src_port = socket_trace_data['localPort']
121 dst_ip = clean_prefixed_ip_address(socket_trace_data['destIp'])
122 dst_port = socket_trace_data['destPort']
123 # Prepare the Community ID template based on the protocol
124 if 'tcp' in socket_trace_data['socketType']:
125 tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dst_port)
126 else:
127 tpl = communityid.FlowTuple.make_udp(src_ip, dst_ip, src_port, dst_port)
128 # Attach the Community ID
129 socket_trace_data['communityId'] = cls.COMMUNITY_ID.calc(tpl)
130
131 @classmethod
132 def _get_clean_stacktrace(cls, stacktrace: dict) -> SocketTraceData:
133 """
134 Get a clean stacktrace object by removing unnecessary fields,
135 renaming keys in camel case (with optional prefix) and ensuring the
136 timestamp is in seconds (instead of milliseconds).
137
138 Side-effects free.
139 """
140 clean_trace_data = keys_to_camel_case({
141 'timestamp': stacktrace['timestamp'] / 1000.,
142 'process': stacktrace['process'],
143 **stacktrace['data'],
144 }, prefix=cls.KEYS_PREFIX)
145 cls._attach_community_id_to_stacktrace(clean_trace_data)
146 return clean_trace_data # type: ignore[return-value]
147
148 def _preprocess_socket_traces(self) -> None:
149 """Create the mapping of socket traces (by community ID + flow direction) to efficiently attach them afterwards."""
150 assert isinstance(self.input_data, list), type(self.input_data)
151 for raw_stack_trace in self.input_data:
152 clean_stack_trace = self._get_clean_stacktrace(raw_stack_trace)
153 socket_type = clean_stack_trace['socketEventType']
154 # Use read operations on the socket when dealing with a response (in), write operations otherwise (out)
155 flow_dir: FlowDirection | None = 'out' if socket_type in {'write', 'sendto'} else 'in' if socket_type in {'read', 'recvfrom'} else None
156 if flow_dir is None:
157 continue
158 # TODO: check that timestamp != of others?
159 self.socket_traces_map[(clean_stack_trace['communityId'], flow_dir)].add(clean_stack_trace)
160
161 def _find_best_stacktrace(self, har: HAREntryMetadata) -> SocketTraceData | None:
162 r"""
163 Find the stacktrace with the closest\* timestamp to the given one matching the community ID
164
165 \* (in the past if direction is `out`, in the future if direction was `in`)
166 """
167 matching_traces = self.socket_traces_map.get((har.community_id, har.direction))
168 if not matching_traces:
169 logger.warning(f'No socket operation has been found for {har}')
170 return None
171 har_timestamp = har.timestamp + self.systematic_time_shift
172 if (closest := get_closest_in_window(matching_traces, har_timestamp, self.time_windows[har.direction])) is None:
173 socket_chronology = 'just before' if har.direction == 'out' else 'just after'
174 logger.warning(f'No socket operation has been found {socket_chronology} {har}')
175 return None
176 closest_socket_data: SocketTraceData
177 closest_socket_ix, closest_socket_timestamp, closest_socket_data = closest
178 current_delta_sec = har_timestamp - closest_socket_timestamp
179 pairing_key = (har.community_id, har.direction, closest_socket_ix)
180 already_paired_har = self.paired_socket_traces.get(pairing_key)
181 if already_paired_har is not None:
182 if already_paired_har.timestamp == har.timestamp:
183 # OK: multiple HTTP2 streams in 1 network frame (and thus 1 socket call)
184 # It may also VERY RARELY happen in HTTP1 so we do NOT raise
185 assert already_paired_har.is_http2 is har.is_http2, (har, already_paired_har) # consistent HTTP version
186 if not har.is_http2:
187 logger.warning(f"{har} getting paired with already paired {already_paired_har}")
188 else:
189 # we could raise but this happens in real life...
190 # TODO? find best OVERALL allocations of socket operations instead of FIFO?
191 logger.warning(
192 f'Pairing {har} with socket operation @ {closest_socket_timestamp:.3f}, '
193 f'but it is also paired with {already_paired_har}...'
194 )
195 self.paired_socket_traces[pairing_key] = har
196 logger.debug(f'Stacktrace found with ∆t={current_delta_sec * 1000:.1f}ms for {har}')
197 return closest_socket_data
198
199 @staticmethod
200 def _compact_stack_trace(stack_trace: SocketTraceData) -> list[str] | None:
201 """Compact the stacktrace for convenience"""
202 if 'stack' not in stack_trace: # this happens...
203 return None
204 # order of dictionary keys is officially guaranteed since Python >= 3.7
205 return list({call['class']: 0 for call in stack_trace['stack']})
206
207 def _enrich_directed_entry(
208 self, har_entry: dict[str, Any], community_id: CommunityID, direction: FlowDirection, *, har_entry_id: str
209 ) -> None:
210 """Attach the stacktrace to the given HAR directed entry (either request or response), in-place"""
211 # Fail first
212 if direction not in ('in', 'out'):
213 raise ValueError(f'Invalid communication direction: {direction}')
214 # <!> we always expect the `har_entry` to have out-of-specs `_timestamp: float` key
215 # but it may be None (missing response)
216 if har_entry['_timestamp'] is None:
217 return
218 stack_trace = self._find_best_stacktrace(
219 HAREntryMetadata(
220 community_id,
221 direction,
222 Timestamp(har_entry['_timestamp']),
223 # useful metadata when debugging
224 har_entry_id,
225 har_entry['httpVersion'] == 'HTTP/2',
226 )
227 )
228 # Attach the stacktrace to the HAR entry if found
229 if stack_trace:
230 har_entry['_stacktrace'] = {'stack': None} | {
231 k: v for k, v in stack_trace.items() if k not in self.DO_NOT_EXPORT_STACKTRACE_KEYS
232 } | {'compact': self._compact_stack_trace(stack_trace)}
233
[docs]
234 def enrich_entry(self, har_entry: dict[str, Any]) -> None:
235 """Enrich the HAR data with the stacktraces information"""
236 # <!> we expect our out-of-specs fields: _communityId & _sha1Id & _timestamp
237 community_id = har_entry['_communityId']
238 har_entry_id = har_entry['_sha1Id']
239 self._enrich_directed_entry(har_entry['request'], community_id, direction='out', har_entry_id=har_entry_id)
240 self._enrich_directed_entry(har_entry['response'], community_id, direction='in', har_entry_id=har_entry_id)