1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
2# SPDX-FileContributor: u039b <git@0x39b.fr>
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5
6import logging
7import time
8from typing import Any, Dict
9
10import requests
11from requests.auth import HTTPBasicAuth
12
13from mongoose.forward.base import BaseForwarder, BaseFormatter
14from mongoose.models.configuration import WebhookForwarderConfiguration
15
16logger = logging.getLogger(__name__)
17
18
57
58
[docs]
59class WebhookForwarder(BaseForwarder):
60 """Forwards network events to a remote webhook asynchronously.
61
62 Subscribes to specified topics and sends data to a configured URL using HTTP POST.
63 It manages its own background worker thread for non-blocking operation and
64 supports various authentication methods and retry logic.
65
66 Notes:
67 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers.
68 Credentials should be provided via `WebhookConfiguration` using `SecretStr`
69 to prevent accidental leakage in logs.
70 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`).
71 It is highly recommended to keep this enabled in production.
72 - **Sensitive data**: While Mongoose normalizes data, users should ensure the
73 webhook endpoint is secured (HTTPS) as network event data may contain
74 sensitive infrastructure information (IPs, ports, protocol details).
75 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for
76 connection pooling and to keep authentication headers isolated.
77 - **Forwarding modes**:
78 - `immediate`: Sends data as soon as it is received.
79 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs.
80 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`.
81 """
82
[docs]
83 def __init__(self, config: WebhookForwarderConfiguration):
84 """Initialize the WebhookForwarder.
85
86 Args:
87 config: A WebhookConfiguration instance defining the destination and security settings.
88 """
89 super().__init__(topics=config.topics)
90 self.config = config
91 self.formatter = WebhookFormatter()
92 self._session = self._setup_session()
93 self._buffer = []
94
95 def _setup_session(self) -> requests.Session:
96 """Set up a requests Session with authentication and security settings.
97
98 Configures the session with custom headers, User-Agent, and applies
99 SSL verification and authentication based on the configuration.
100
101 Returns:
102 A configured requests.Session object.
103 """
104 session = requests.Session()
105 session.verify = self.config.verify_ssl
106 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"})
107 session.headers.update(self.config.headers)
108 self._apply_authentication(session)
109 return session
110
111 def _apply_authentication(self, session: requests.Session):
112 """Apply configured authentication to the session.
113
114 Based on `config.auth_type`, this method sets up HTTP Basic Auth,
115 Bearer Token in the Authorization header, or a custom API key header.
116
117 Args:
118 session: The requests.Session to configure.
119 """
120 if not self.config.auth_token:
121 return
122
123 token = self.config.auth_token.get_secret_value()
124
125 if self.config.auth_type == "basic":
126 username, password = token.split(":", 1)
127 session.auth = HTTPBasicAuth(username, password)
128 elif self.config.auth_type == "bearer":
129 session.headers["Authorization"] = f"Bearer {token}"
130 elif self.config.auth_type == "header":
131 session.headers[self.config.auth_header_name] = token
132
[docs]
133 def start(self):
134 """Start the forwarder worker thread.
135
136 Launches a background daemon thread that subscribes to the configured
137 topics and processes incoming events.
138 """
139 super().start()
140 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
141
[docs]
142 def match_filters(self, data):
143 """Check if data matches any configured filters.
144 Filters are applied using OR logic: if any filter matches, the data passes.
145 If no filters are configured, all data is allowed through.
146
147 Args:
148 data: The data object to check against configured filters.
149
150 Returns:
151 True if data passes filtering (matches at least one filter or no filters configured),
152 False otherwise.
153 """
154 matches = len(self.config.filters) == 0
155 for f in self.config.filters:
156 if f.matches(data):
157 matches = True
158 return matches
159
160 def _run(self):
161 """Main worker loop to process messages and forward them.
162
163 Overrides BaseForwarder._run to support different forwarding modes.
164 """
165 last_periodic_send = time.time()
166
167 while not self.processing_queue.processing_stopped() and self.config.enable:
168 try:
169 if self.config.mode == "immediate":
170 data = self.queue.get(timeout=1.0)
171 if data is not None:
172 if self.match_filters(data):
173 self.forward(data)
174 self.queue.task_done()
175 elif self.config.mode == "bulk":
176 try:
177 data = self.queue.get(timeout=1.0)
178 if data is not None:
179 if self.match_filters(data):
180 self._buffer.append(data)
181 if len(self._buffer) >= self.config.bulk_size:
182 self._flush_buffer()
183 self.queue.task_done()
184 except (Exception,): # timeout
185 if self._buffer:
186 self._flush_buffer()
187 elif self.config.mode == "periodic":
188 now = time.time()
189 # Collect data from queue
190 try:
191 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth
192 data = self.queue.get_nowait()
193 if data is not None:
194 if self.match_filters(data):
195 self._buffer.append(data)
196 self.queue.task_done()
197 except (Exception,): # empty queue
198 pass
199
200 if now - last_periodic_send >= self.config.periodic_interval:
201 if self._buffer:
202 # Send up to periodic_rate items
203 to_send = self._buffer[: self.config.periodic_rate]
204 self._buffer = self._buffer[self.config.periodic_rate :]
205 self._forward_batch(to_send)
206 last_periodic_send = now
207
208 time.sleep(0.1) # Avoid tight loop
209
210 except Exception as e:
211 import queue as q
212
213 if isinstance(e, q.Empty):
214 continue
215 logger.error(f"Error in {self.__class__.__name__} worker: {e}")
216
217 # Final flush
218 if self._buffer:
219 self._flush_buffer()
220
221 def _flush_buffer(self):
222 """Send all data in the buffer."""
223 if not self._buffer:
224 return
225 self._forward_batch(self._buffer)
226 self._buffer = []
227
228 def _forward_batch(self, batch: list):
229 """Format and send a batch of data.
230
231 Args:
232 batch: List of data objects to forward.
233 """
234 payloads = []
235 for data in batch:
236 payload = self.formatter.format(data)
237 if payload.get("error") != "formatting_failed":
238 payloads.append(payload)
239
240 if not payloads:
241 return
242
243 # If it's a single item and NOT periodic/bulk, we might want to send it as is,
244 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple.
245 # However, many webhooks expect a single object or a list.
246 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0]
247
248 for attempt in range(self.config.retry_count + 1):
249 try:
250 self._send_payload(data_to_send)
251 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}")
252 return
253 except requests.exceptions.RequestException as e:
254 if not self._should_retry(e, attempt):
255 break
256 time.sleep(self.config.retry_delay)
257 self.disable()
258
[docs]
259 def disable(self):
260 """Disable the forwarder."""
261 self.config.enable = False
262 self.processing_queue.unsubscribe(self.subscriber_id)
263
[docs]
264 def forward(self, data: Any):
265 """Send formatted data to the webhook URL with retries.
266
267 Orchestrates the formatting of data and the retry loop for delivery.
268
269 Args:
270 data: The data to forward (Pydantic model instance).
271 """
272
273 self._forward_batch([data])
274
275 def _send_payload(self, payload: Dict[str, Any]):
276 """Execute the HTTP POST request.
277
278 Args:
279 payload: The dictionary to send as JSON.
280
281 Raises:
282 requests.exceptions.RequestException: If the request fails or returns an error status.
283 """
284 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout)
285 response.raise_for_status()
286
287 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool:
288 """Determine if a request should be retried based on the exception and attempt count.
289
290 Args:
291 exception: The exception encountered during the request.
292 attempt: The current attempt number (starting at 0).
293
294 Returns:
295 True if the request should be retried, False otherwise.
296 """
297 should_retry = True
298 if response := getattr(exception, "response", None):
299 if 400 <= response.status_code < 500:
300 should_retry = False
301 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}")
302
303 if should_retry and attempt < self.config.retry_count:
304 logger.warning(
305 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. "
306 f"Retrying in {self.config.retry_delay}s..."
307 )
308 return True
309
310 if should_retry:
311 logger.error(
312 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}"
313 )
314 return False