1# SPDX-FileCopyrightText: 2024 Pôle d'Expertise de la Régulation Numérique - PEReN <contact@peren.gouv.fr>
2# SPDX-License-Identifier: MIT
3
4import logging
5from pathlib import Path
6from operator import itemgetter
7from collections import defaultdict
8from collections.abc import Mapping
9from dataclasses import dataclass
10from typing import TypedDict, Literal, ClassVar, Any
11
12import communityid
13from sortedcontainers import SortedKeyList
14
15from .base import HarEnrichment
16from .types import CommunityID, Timestamp, FlowDirection
17from .utils import keys_to_camel_case, clean_prefixed_ip_address
18from .sorted_list import get_closest_in_window
19
20logger = logging.getLogger('enrichment')
21
22
[docs]
23class SocketTraceData(TypedDict):
24 timestamp: Timestamp # seconds
25 process: str
26 pid: int
27 stack: list[dict]
28 socketEventType: str
29 localIp: str
30 localPort: int
31 destIp: str
32 destPort: int
33 socketType: Literal['tcp', 'tcp6', 'udp', 'udp6']
34 communityId: CommunityID
35
36
[docs]
37def empty_time_sorted_list_of_stack_traces():
38 return SortedKeyList(key=itemgetter('timestamp'))
39
40
[docs]
41@dataclass(frozen=True)
42class HAREntryMetadata:
43 community_id: CommunityID
44 direction: FlowDirection
45 timestamp: Timestamp
46 entry_id: str # `_sha1Id` field
47 is_http2: bool
48
49
[docs]
50class Stacktrace(HarEnrichment):
51
52 ID: ClassVar = 'stacktrace'
53
54 COMMUNITY_ID: ClassVar = communityid.CommunityID()
55
56 KEYS_PREFIX: ClassVar[str] = ''
57
58 DO_NOT_EXPORT_STACKTRACE_KEYS: ClassVar = {
59 # redundant
60 'communityId',
61 'destIp',
62 'destPort',
63 'localIp',
64 'localPort',
65 }
66
67 def __init__(
68 self,
69 har_data: dict,
70 input_data_file: Path,
71 *,
72 systematic_time_shift: float = 0.0,
73 time_window_requests: tuple[float, float] = (-5.0, 2.0),
74 time_window_responses: tuple[float, float] = (-2.0, 5.0),
75 ) -> None:
76 super().__init__(har_data, input_data_file)
77 self.socket_traces_map: Mapping[tuple[CommunityID, FlowDirection], SortedKeyList] = defaultdict(empty_time_sorted_list_of_stack_traces)
78 self.paired_socket_traces: dict[tuple[CommunityID, FlowDirection, int], HAREntryMetadata] = {}
79
80 self.time_windows: dict[FlowDirection, tuple[float, float]] = {
81 'out': time_window_requests,
82 'in': time_window_responses,
83 }
84 """
85 Tolerances (in seconds) regarding chronology of socket operations compared to network traffic (per flow direction).
86
87 - For outbound network traffic, the socket operation shall be in the past, or a very very close future,
88 - For inbound network traffic, it is the opposite.
89 """
90
91 self.systematic_time_shift = systematic_time_shift
92 """
93 Systematic time shift in seconds between socket operations timestamps vs. network traffic timestamps.
94
95 Indeed socket operations timestamps come from phone date, whereas network traffic timestamps come
96 from Pirogue date, which may be desynchronized.
97
98 Positive means network traffic timestamps (Pirogue) were earlier than socket operations timestamps (phone).
99 """
100
101 if self.can_enrich:
102 self._preprocess_socket_traces()
103
104 @classmethod
105 def _attach_community_id_to_stacktrace(cls, socket_trace_data: dict) -> None:
106 """Compute and append in-place the Community ID to the given stacktrace"""
107 src_ip = clean_prefixed_ip_address(socket_trace_data['localIp'])
108 src_port = socket_trace_data['localPort']
109 dst_ip = clean_prefixed_ip_address(socket_trace_data['destIp'])
110 dst_port = socket_trace_data['destPort']
111 # Prepare the Community ID template based on the protocol
112 if 'tcp' in socket_trace_data['socketType']:
113 tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dst_port)
114 else:
115 tpl = communityid.FlowTuple.make_udp(src_ip, dst_ip, src_port, dst_port)
116 # Attach the Community ID
117 socket_trace_data['communityId'] = cls.COMMUNITY_ID.calc(tpl)
118
119 @classmethod
120 def _get_clean_stacktrace(cls, stacktrace: dict) -> SocketTraceData:
121 """
122 Get a clean stacktrace object by removing unnecessary fields,
123 renaming keys in camel case (with optional prefix) and ensuring the
124 timestamp is in seconds (instead of milliseconds).
125
126 Side-effects free.
127 """
128 clean_trace_data = keys_to_camel_case({
129 'timestamp': stacktrace['timestamp'] / 1000.,
130 'process': stacktrace['process'],
131 **stacktrace['data'],
132 }, prefix=cls.KEYS_PREFIX)
133 cls._attach_community_id_to_stacktrace(clean_trace_data)
134 return clean_trace_data # type: ignore[return-value]
135
136 def _preprocess_socket_traces(self) -> None:
137 """Create the mapping of socket traces (by community ID + flow direction) to efficiently attach them afterwards."""
138 assert isinstance(self.input_data, list), type(self.input_data)
139 for raw_stack_trace in self.input_data:
140 clean_stack_trace = self._get_clean_stacktrace(raw_stack_trace)
141 socket_type = clean_stack_trace['socketEventType']
142 # Use read operations on the socket when dealing with a response (in), write operations otherwise (out)
143 flow_dir: FlowDirection | None = 'out' if socket_type in {'write', 'sendto'} else 'in' if socket_type in {'read', 'recvfrom'} else None
144 if flow_dir is None:
145 continue
146 # TODO: check that timestamp != of others?
147 self.socket_traces_map[(clean_stack_trace['communityId'], flow_dir)].add(clean_stack_trace)
148
149 def _find_best_stacktrace(self, har: HAREntryMetadata) -> SocketTraceData | None:
150 r"""
151 Find the stacktrace with the closest\* timestamp to the given one matching the community ID
152
153 \* (in the past if direction is `out`, in the future if direction was `in`)
154 """
155 matching_traces = self.socket_traces_map.get((har.community_id, har.direction))
156 if not matching_traces:
157 logger.warning(f'No socket operation has been found for {har}')
158 return None
159 har_timestamp = har.timestamp + self.systematic_time_shift
160 if (closest := get_closest_in_window(matching_traces, har_timestamp, self.time_windows[har.direction])) is None:
161 socket_chronology = 'just before' if har.direction == 'out' else 'just after'
162 logger.warning(f'No socket operation has been found {socket_chronology} {har}')
163 return None
164 closest_socket_data: SocketTraceData
165 closest_socket_ix, closest_socket_timestamp, closest_socket_data = closest
166 current_delta_sec = har_timestamp - closest_socket_timestamp
167 pairing_key = (har.community_id, har.direction, closest_socket_ix)
168 already_paired_har = self.paired_socket_traces.get(pairing_key)
169 if already_paired_har is not None:
170 if already_paired_har.timestamp == har.timestamp:
171 # OK: multiple HTTP2 streams in 1 network frame (and thus 1 socket call)
172 assert already_paired_har.is_http2 and har.is_http2, (har, already_paired_har)
173 else:
174 # we could raise but this happens in real life...
175 # TODO? find best OVERALL allocations of socket operations instead of FIFO?
176 logger.warning(
177 f'Pairing {har} with socket operation @ {closest_socket_timestamp:.3f}, '
178 f'but it is also paired with {already_paired_har}...'
179 )
180 self.paired_socket_traces[pairing_key] = har
181 logger.debug(f'Stacktrace found with ∆t={current_delta_sec * 1000:.1f}ms for {har}')
182 return closest_socket_data
183
184 @staticmethod
185 def _compact_stack_trace(stack_trace: SocketTraceData) -> list[str] | None:
186 """Compact the stacktrace for convenience"""
187 if 'stack' not in stack_trace: # this happens...
188 return None
189 # order of dictionary keys is officially guaranteed since Python >= 3.7
190 return list({call['class']: 0 for call in stack_trace['stack']})
191
192 def _enrich_directed_entry(
193 self, har_entry: dict[str, Any], community_id: CommunityID, direction: FlowDirection, *, har_entry_id: str
194 ) -> None:
195 """Attach the stacktrace to the given HAR directed entry (either request or response), in-place"""
196 # Fail first
197 if direction not in ('in', 'out'):
198 raise ValueError(f'Invalid communication direction: {direction}')
199 # <!> we always expect the `har_entry` to have out-of-specs `_timestamp: float` key
200 # but it may be None (missing response)
201 if har_entry['_timestamp'] is None:
202 return
203 stack_trace = self._find_best_stacktrace(
204 HAREntryMetadata(
205 community_id,
206 direction,
207 Timestamp(har_entry['_timestamp']),
208 # useful metadata when debugging
209 har_entry_id,
210 har_entry['httpVersion'] == 'HTTP/2',
211 )
212 )
213 # Attach the stacktrace to the HAR entry if found
214 if stack_trace:
215 har_entry['_stacktrace'] = {'stack': None} | {
216 k: v for k, v in stack_trace.items() if k not in self.DO_NOT_EXPORT_STACKTRACE_KEYS
217 } | {'compact': self._compact_stack_trace(stack_trace)}
218
[docs]
219 def enrich_entry(self, har_entry: dict[str, Any]) -> None:
220 """Enrich the HAR data with the stacktraces information"""
221 # <!> we expect our out-of-specs fields: _communityId & _sha1Id & _timestamp
222 community_id = har_entry['_communityId']
223 har_entry_id = har_entry['_sha1Id']
224 self._enrich_directed_entry(har_entry['request'], community_id, direction='out', har_entry_id=har_entry_id)
225 self._enrich_directed_entry(har_entry['response'], community_id, direction='in', har_entry_id=har_entry_id)