1from abc import ABC, abstractmethod
2from functools import cached_property
3from dataclasses import dataclass
4from collections.abc import Sequence
5from typing import ClassVar, Any
6
7from ...payload import Payload
8from ..types import HarEntry, DictLayers
9from ..utils import get_layers_mapping, get_tshark_bytes_from_raw, har_entry_with_common_fields
10
11HTTP_METHODS = {'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS', 'CONNECT', 'TRACE'}
12
13
14def _get_raw_headers(http_layer: dict[str, Any], direction: str) -> list[bytes]:
15 raw_headers = http_layer.get(f"http.{direction}.line_raw")
16 if not raw_headers:
17 return []
18 if isinstance(http_layer[f"http.{direction}.line"], str): # only 1 header (dirty structure)
19 raw_headers = [raw_headers]
20 return [get_tshark_bytes_from_raw(h) for h in raw_headers]
21
22
[docs]
23@dataclass(frozen=True)
24class HttpRequestResponse(ABC):
25 """
26 Base class for HTTP request and response packets. It wraps the packet data and provides methods to
27 access the relevant information.
28 """
29 packet: DictLayers
30
31 FALLBACK_CONTENT_TYPE: ClassVar[str] = 'application/octet-stream'
32
33 @property
34 def frame_nb(self) -> int:
35 # useful for debugging with Wireshark
36 return int(self.packet['frame']['frame.number'])
37
38 @property
39 def community_id(self) -> str:
40 return self.packet['communityid']
41
[docs]
42 @cached_property
43 def ip_version_and_layer(self) -> tuple[str, dict[str, Any]]:
44 ipv4 = "ip" in self.packet
45 ipv6 = "ipv6" in self.packet
46 assert ipv4 ^ ipv6, self
47 ip_version_kw = "ipv6" if ipv6 else "ip"
48 return ip_version_kw, self.packet[ip_version_kw]
49
50 @property
51 def src_host(self) -> str:
52 ipv, ip_layer = self.ip_version_and_layer
53 return ip_layer[f"{ipv}.src_host"]
54
55 @property
56 def dst_host(self) -> str:
57 ipv, ip_layer = self.ip_version_and_layer
58 return ip_layer[f"{ipv}.dst_host"]
59
60 @property
61 def src_ip(self) -> str:
62 ipv, ip_layer = self.ip_version_and_layer
63 return ip_layer[f"{ipv}.src"]
64
65 @property
66 def dst_ip(self) -> str:
67 ipv, ip_layer = self.ip_version_and_layer
68 return ip_layer[f"{ipv}.dst"]
69
70 @property
71 def src_port(self) -> int:
72 return int(self.packet['tcp']['tcp.srcport'])
73
74 @property
75 def dst_port(self) -> int:
76 return int(self.packet['tcp']['tcp.dstport'])
77
78 @property
79 def http_layer(self) -> dict[str, Any]:
80 return self.packet['http']
81
82 @property
83 @abstractmethod
84 def raw_headers(self) -> Sequence[bytes]:
85 pass
86
87 @property
88 def header_length(self) -> int:
89 return len(b''.join(self.raw_headers))
90
91 @property
92 def content_type(self) -> str:
93 if not self.payload:
94 return ''
95 content_type: str | list[str] = self.http_layer.get('http.content_type', self.FALLBACK_CONTENT_TYPE)
96 if isinstance(content_type, list):
97 content_type = content_type[-1] # we take last value when multiple values
98 return content_type
99
[docs]
100 @cached_property
101 def payload(self) -> Payload:
102 raw_data = self.http_layer.get('http.file_data_raw')
103 if raw_data is None:
104 # handle tshark error during decompression
105 for k, v in self.http_layer.items():
106 if k.lower().startswith('content-encoded entity body ') and isinstance(v, dict):
107 raw_data = v['data_raw']
108 break
109 return Payload(get_tshark_bytes_from_raw(raw_data))
110
111 @property
112 def content_length(self) -> int:
113 return self.payload.size
114
115 @property
116 def timestamp(self) -> float:
117 return float(self.packet['frame']['frame.time_epoch'])
118
132
133 @property
134 def common_har_props(self) -> dict[str, Any]:
135 return {
136 'cookies': [],
137 'headers': self.headers,
138 'headersSize': self.header_length,
139 'bodySize': self.content_length,
140 '_timestamp': self.timestamp,
141 '_rawFramesNumbers': [self.frame_nb], # always 1 frame in HTTP1
142 '_communication': {
143 'src': {
144 'ip': self.src_ip,
145 'host': self.src_host,
146 'port': self.src_port,
147 },
148 'dst': {
149 'ip': self.dst_ip,
150 'host': self.dst_host,
151 'port': self.dst_port,
152 }
153 },
154 }
155
156
[docs]
157@dataclass(frozen=True)
158class HttpRequest(HttpRequestResponse):
159 """
160 Class to represent an HTTP request.
161 """
162 @property
163 def raw_headers(self) -> list[bytes]:
164 return _get_raw_headers(self.http_layer, 'request')
165
[docs]
166 @cached_property
167 def http_version_method(self) -> tuple[str, str]:
168 """
169 Get the HTTP version & method from the packet data.
170 :return: tuple with HTTP version & method
171 """
172 for d in self.http_layer.values():
173 if not isinstance(d, dict) or 'http.request.version' not in d:
174 continue
175 version = d['http.request.version']
176 assert version.startswith('HTTP/1.'), version
177 meth = d['http.request.method']
178 assert meth in HTTP_METHODS, meth
179 return version, meth
180 return 'HTTP/1.1', ''
181
182 @property
183 def sending_duration(self) -> float:
184 return round(1000 * float(self.packet['frame'].get('frame.time_delta', 0)), 2)
185
[docs]
186 def to_har(self) -> dict[str, Any]:
187 """
188 Convert the HTTP request to HTTP Archive (HAR) format.
189 :return: the HTTP request in HAR format
190 """
191 http_version, method = self.http_version_method
192 d = {
193 'method': method,
194 'url': self.uri,
195 'queryString': [],
196 'httpVersion': http_version,
197 **self.common_har_props,
198 }
199 if self.content_length:
200 self.payload.update_har_request(d, self.content_type)
201 return d
202
203 @property
204 def uri(self) -> str:
205 return self.http_layer['http.request.full_uri']
206
207
[docs]
208@dataclass(frozen=True)
209class HttpResponse(HttpRequestResponse):
210 """
211 Class to represent an HTTP response.
212 """
213 @property
214 def raw_headers(self) -> list[bytes]:
215 return _get_raw_headers(self.http_layer, 'response')
216
[docs]
217 @cached_property
218 def http_version_status_code_message(self) -> tuple[str, int, str]:
219 """
220 Retrieve the HTTP version & status code & message.
221 :return: tuple with HTTP version, status code and message
222 """
223 for d in self.http_layer.values():
224 if not isinstance(d, dict) or 'http.response.version' not in d:
225 continue
226 version = d['http.response.version']
227 assert version.startswith('HTTP/1.'), version
228 return version, int(d['http.response.code']), d['http.response.code.desc']
229 return 'HTTP/1.1', 0, ''
230
[docs]
231 def to_har(self):
232 """
233 Convert the HTTP response to HTTP Archive (HAR) format.
234 :return: the HTTP response in HAR format
235 """
236 http_version, status_code, status_message = self.http_version_status_code_message
237 d = {
238 'status': status_code,
239 'statusText': status_message,
240 'redirectURL': '',
241 'httpVersion': http_version,
242 **self.common_har_props,
243 }
244 self.payload.update_har_response(d, self.content_type)
245 return d
246
247 @property
248 def receiving_duration(self) -> float:
249 return round(1000 * float(self.http_layer.get('http.time', 0)), 2)
250
251
[docs]
252class HttpConversation:
253 """
254 Class to represent an HTTP conversation composed of a request and a response.
255 """
256 def __init__(self, request_layers: DictLayers, response_layers: DictLayers):
257 self.request = HttpRequest(request_layers)
258 self.response = HttpResponse(response_layers)
259
260 @property
261 def community_id(self) -> str:
262 cid = self.request.community_id
263 try:
264 assert cid == self.response.community_id, (cid, self.response.community_id)
265 except KeyError: # buggy/incomplete response may not have `community_id` but OK
266 pass
267 return cid
268
269 @property
270 def waiting_duration(self) -> float:
271 return round(1000 * (self.response.timestamp - self.request.timestamp), 2)
272
[docs]
273 def to_har(self) -> dict[str, Any]:
274 """
275 Convert the HTTP conversation to HTTP Archive (HAR) format.
276 :return: the HTTP conversation (request and response) in HAR format
277 """
278 return har_entry_with_common_fields({
279 '_timestamp': self.request.timestamp,
280 'timings': {
281 'send': self.request.sending_duration,
282 'wait': self.waiting_duration,
283 'receive': self.response.receiving_duration
284 },
285 'serverIPAddress': self.request.dst_ip,
286 '_communityId': self.community_id,
287 'request': self.request.to_har(),
288 'response': self.response.to_har()
289 })
290
291
[docs]
292class Http1Traffic:
293 """
294 Class to represent HTTP1 network traffic.
295
296 This class is the entry point for parsing HTTP1 network traffic.
297
298 The format of JSON data from tshark is as follows for a single HTTP request:
299
300 - `GET /spi/v2/platforms/ HTTP/1.1\\r\\n`: Contains the HTTP method, URI, and version.
301 - `http.request.version`: The HTTP version used.
302 - `http.request.line`: A list of HTTP headers sent with the request.
303 - `http.host`: The Host header value.
304 - `http.request.full_uri`: The full URI including the scheme (e.g., https).
305 - `http.request_number`: The request number.
306 - `http.response_in`: The response number associated with this request.
307
308 The format of JSON data from tshark is as follows for a single HTTP response:
309
310 - `HTTP/1.1 200 OK\\r\\n`: Contains the HTTP version, status code, and status phrase.
311 - `http.content_type`: The Content-Type header value.
312 - `http.response.line`: A list of HTTP headers sent with the response.
313 - `http.content_encoding`: The Content-Encoding header value.
314 - `http.response_number`: The response number.
315 - `http.time`: The time taken for the response.
316 - `http.request_in`: The request number associated with this response.
317 - `http.response_for.uri`: The URI for which this response is generated.
318 - `http.file_data_raw`: The data in hexadecimal format (requires -x flag).
319 """
320 def __init__(self, traffic: Sequence[DictLayers]):
321 self.traffic = traffic
322 self.conversations: list[HttpConversation] = []
323 self.parse_traffic()
324
[docs]
325 def parse_traffic(self) -> None:
326 """
327 Parse the HTTP network traffic and extract the request-response pairs.
328
329 Identify each HTTP request and its associated HTTP response by following these steps:
330
331 1. Iterate through packets: It loops through all packets obtained from the `traffic` object.
332 2. Check protocols: It checks if the packet contains the `http` protocol by examining the `frame.protocols`
333 field.
334 3. Identify http requests: It checks if the packet contains an HTTP request by looking for the `http.request`
335 key in the `http` layer.
336 4. Find associated response: If the packet is an HTTP request and contains the `http.response_in` key, it
337 retrieves the corresponding response packet using the `get_packet_by_number` method with the response number.
338 5. Create conversation: It creates an `HttpConversation` object with the request and response packets and
339 appends it to the `conversations` list.
340 """
341 layers_mapping = get_layers_mapping(self.traffic)
342
343 for request_layers in self.traffic:
344 protocols = request_layers['frame']['frame.protocols'].split(':')
345 if 'http' not in protocols or 'http' not in request_layers:
346 # happens that both 'http' & 'http2' are in `protocols`
347 # but only 'http2' is in layers
348 continue
349 http_layer = request_layers['http']
350 if 'http.request' not in http_layer or 'http.response_in' not in http_layer:
351 continue
352 # This is a request
353 response_layers = layers_mapping[int(http_layer['http.response_in'])]
354 self.conversations.append(HttpConversation(request_layers, response_layers))
355
[docs]
356 def get_har_entries(self) -> list[HarEntry]:
357 """
358 Convert the HTTP network traffic to HTTP Archive (HAR) format.
359 :return: the HTTP network traffic in HAR format
360 """
361 entries = []
362 for http_conversation in self.conversations:
363 entries.append(http_conversation.to_har())
364 return entries