Source code for mongoose.forward.webhook

  1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
  2# SPDX-FileContributor: u039b <git@0x39b.fr>
  3#
  4# SPDX-License-Identifier: GPL-3.0-or-later
  5
  6import logging
  7import time
  8from typing import Any, Dict
  9
 10import requests
 11from requests.auth import HTTPBasicAuth
 12
 13from mongoose.forward.base import BaseForwarder, BaseFormatter
 14from mongoose.models.configuration import WebhookForwarderConfiguration
 15
 16logger = logging.getLogger(__name__)
 17
 18
[docs] 19class WebhookFormatter(BaseFormatter): 20 """Formats network data for webhook consumption. 21 22 This class handles the conversion of network models (DPI, Alert, Flow) into 23 JSON-serializable dictionaries. It ensures that complex types like datetime 24 objects are correctly converted to strings and that any Pydantic-specific 25 fields are handled according to the Pydantic version in use. 26 27 Security considerations: 28 - The formatter performs a JSON round-trip to ensure data consistency 29 and safety before it is sent over the network. 30 - It uses a custom default for serialization to prevent failures on 31 complex types, ensuring robust operation even with unexpected data. 32 """ 33
[docs] 34 @staticmethod 35 def format(data: Any) -> Dict[str, Any]: 36 """Format the given data into a dictionary for JSON serialization. 37 38 Handles Pydantic v1/v2 instances and generic objects. 39 40 Args: 41 data: The model instance or data object to format. 42 43 Returns: 44 A dictionary representation of the data, guaranteed to be JSON serializable. 45 Returns an error dictionary if formatting fails. 46 """ 47 try: 48 payload = BaseFormatter.to_dict(data) 49 50 # Ensure it's JSON serializable by doing a round-trip with a custom default 51 import json 52 53 return json.loads(json.dumps(payload, default=str)) 54 except Exception as e: 55 logger.error(f"Failed to format data for webhook: {e}") 56 return {"error": "formatting_failed", "details": str(e)}
57 58
[docs] 59class WebhookForwarder(BaseForwarder): 60 """Forwards network events to a remote webhook asynchronously. 61 62 Subscribes to specified topics and sends data to a configured URL using HTTP POST. 63 It manages its own background worker thread for non-blocking operation and 64 supports various authentication methods and retry logic. 65 66 Notes: 67 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers. 68 Credentials should be provided via `WebhookConfiguration` using `SecretStr` 69 to prevent accidental leakage in logs. 70 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`). 71 It is highly recommended to keep this enabled in production. 72 - **Sensitive data**: While Mongoose normalizes data, users should ensure the 73 webhook endpoint is secured (HTTPS) as network event data may contain 74 sensitive infrastructure information (IPs, ports, protocol details). 75 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for 76 connection pooling and to keep authentication headers isolated. 77 - **Forwarding modes**: 78 - `immediate`: Sends data as soon as it is received. 79 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs. 80 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`. 81 """ 82
[docs] 83 def __init__(self, config: WebhookForwarderConfiguration): 84 """Initialize the WebhookForwarder. 85 86 Args: 87 config: A WebhookConfiguration instance defining the destination and security settings. 88 """ 89 super().__init__(topics=config.topics) 90 self.config = config 91 self.formatter = WebhookFormatter() 92 self._session = self._setup_session() 93 self._buffer = []
94 95 def _setup_session(self) -> requests.Session: 96 """Set up a requests Session with authentication and security settings. 97 98 Configures the session with custom headers, User-Agent, and applies 99 SSL verification and authentication based on the configuration. 100 101 Returns: 102 A configured requests.Session object. 103 """ 104 session = requests.Session() 105 session.verify = self.config.verify_ssl 106 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"}) 107 session.headers.update(self.config.headers) 108 self._apply_authentication(session) 109 return session 110 111 def _apply_authentication(self, session: requests.Session): 112 """Apply configured authentication to the session. 113 114 Based on `config.auth_type`, this method sets up HTTP Basic Auth, 115 Bearer Token in the Authorization header, or a custom API key header. 116 117 Args: 118 session: The requests.Session to configure. 119 """ 120 if not self.config.auth_token: 121 return 122 123 token = self.config.auth_token.get_secret_value() 124 125 if self.config.auth_type == "basic": 126 username, password = token.split(":", 1) 127 session.auth = HTTPBasicAuth(username, password) 128 elif self.config.auth_type == "bearer": 129 session.headers["Authorization"] = f"Bearer {token}" 130 elif self.config.auth_type == "header": 131 session.headers[self.config.auth_header_name] = token 132
[docs] 133 def start(self): 134 """Start the forwarder worker thread. 135 136 Launches a background daemon thread that subscribes to the configured 137 topics and processes incoming events. 138 """ 139 super().start() 140 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
141
[docs] 142 def match_filters(self, data): 143 """Check if data matches any configured filters. 144 Filters are applied using OR logic: if any filter matches, the data passes. 145 If no filters are configured, all data is allowed through. 146 147 Args: 148 data: The data object to check against configured filters. 149 150 Returns: 151 True if data passes filtering (matches at least one filter or no filters configured), 152 False otherwise. 153 """ 154 matches = len(self.config.filters) == 0 155 for f in self.config.filters: 156 if f.matches(data): 157 matches = True 158 return matches
159 160 def _run(self): 161 """Main worker loop to process messages and forward them. 162 163 Overrides BaseForwarder._run to support different forwarding modes. 164 """ 165 last_periodic_send = time.time() 166 167 while not self.processing_queue.processing_stopped() and self.config.enable: 168 try: 169 if self.config.mode == "immediate": 170 data = self.queue.get(timeout=1.0) 171 if data is not None: 172 if self.match_filters(data): 173 self.forward(data) 174 self.queue.task_done() 175 elif self.config.mode == "bulk": 176 try: 177 data = self.queue.get(timeout=1.0) 178 if data is not None: 179 if self.match_filters(data): 180 self._buffer.append(data) 181 if len(self._buffer) >= self.config.bulk_size: 182 self._flush_buffer() 183 self.queue.task_done() 184 except (Exception,): # timeout 185 if self._buffer: 186 self._flush_buffer() 187 elif self.config.mode == "periodic": 188 now = time.time() 189 # Collect data from queue 190 try: 191 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth 192 data = self.queue.get_nowait() 193 if data is not None: 194 if self.match_filters(data): 195 self._buffer.append(data) 196 self.queue.task_done() 197 except (Exception,): # empty queue 198 pass 199 200 if now - last_periodic_send >= self.config.periodic_interval: 201 if self._buffer: 202 # Send up to periodic_rate items 203 to_send = self._buffer[: self.config.periodic_rate] 204 self._buffer = self._buffer[self.config.periodic_rate :] 205 self._forward_batch(to_send) 206 last_periodic_send = now 207 208 time.sleep(0.1) # Avoid tight loop 209 210 except Exception as e: 211 import queue as q 212 213 if isinstance(e, q.Empty): 214 continue 215 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 216 217 # Final flush 218 if self._buffer: 219 self._flush_buffer() 220 221 def _flush_buffer(self): 222 """Send all data in the buffer.""" 223 if not self._buffer: 224 return 225 self._forward_batch(self._buffer) 226 self._buffer = [] 227 228 def _forward_batch(self, batch: list): 229 """Format and send a batch of data. 230 231 Args: 232 batch: List of data objects to forward. 233 """ 234 payloads = [] 235 for data in batch: 236 payload = self.formatter.format(data) 237 if payload.get("error") != "formatting_failed": 238 payloads.append(payload) 239 240 if not payloads: 241 return 242 243 # If it's a single item and NOT periodic/bulk, we might want to send it as is, 244 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple. 245 # However, many webhooks expect a single object or a list. 246 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0] 247 248 for attempt in range(self.config.retry_count + 1): 249 try: 250 self._send_payload(data_to_send) 251 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}") 252 return 253 except requests.exceptions.RequestException as e: 254 if not self._should_retry(e, attempt): 255 break 256 time.sleep(self.config.retry_delay) 257 self.disable() 258
[docs] 259 def disable(self): 260 """Disable the forwarder.""" 261 self.config.enable = False 262 self.processing_queue.unsubscribe(self.subscriber_id)
263
[docs] 264 def forward(self, data: Any): 265 """Send formatted data to the webhook URL with retries. 266 267 Orchestrates the formatting of data and the retry loop for delivery. 268 269 Args: 270 data: The data to forward (Pydantic model instance). 271 """ 272 273 self._forward_batch([data])
274 275 def _send_payload(self, payload: Dict[str, Any]): 276 """Execute the HTTP POST request. 277 278 Args: 279 payload: The dictionary to send as JSON. 280 281 Raises: 282 requests.exceptions.RequestException: If the request fails or returns an error status. 283 """ 284 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout) 285 response.raise_for_status() 286 287 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool: 288 """Determine if a request should be retried based on the exception and attempt count. 289 290 Args: 291 exception: The exception encountered during the request. 292 attempt: The current attempt number (starting at 0). 293 294 Returns: 295 True if the request should be retried, False otherwise. 296 """ 297 should_retry = True 298 if response := getattr(exception, "response", None): 299 if 400 <= response.status_code < 500: 300 should_retry = False 301 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}") 302 303 if should_retry and attempt < self.config.retry_count: 304 logger.warning( 305 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. " 306 f"Retrying in {self.config.retry_delay}s..." 307 ) 308 return True 309 310 if should_retry: 311 logger.error( 312 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}" 313 ) 314 return False