Source code for mongoose.forward.webhook

  1import logging
  2import time
  3from typing import Any, Dict
  4
  5import requests
  6from requests.auth import HTTPBasicAuth
  7
  8from mongoose.forward.base import BaseForwarder, BaseFormatter
  9from mongoose.models.configuration import WebhookForwarderConfiguration
 10
 11logger = logging.getLogger(__name__)
 12
 13
[docs] 14class WebhookFormatter(BaseFormatter): 15 """Formats network data for webhook consumption. 16 17 This class handles the conversion of network models (DPI, Alert, Flow) into 18 JSON-serializable dictionaries. It ensures that complex types like datetime 19 objects are correctly converted to strings and that any Pydantic-specific 20 fields are handled according to the Pydantic version in use. 21 22 Security considerations: 23 - The formatter performs a JSON round-trip to ensure data consistency 24 and safety before it is sent over the network. 25 - It uses a custom default for serialization to prevent failures on 26 complex types, ensuring robust operation even with unexpected data. 27 """ 28
[docs] 29 @staticmethod 30 def format(data: Any) -> Dict[str, Any]: 31 """Format the given data into a dictionary for JSON serialization. 32 33 Handles Pydantic v1/v2 instances and generic objects. 34 35 Args: 36 data: The model instance or data object to format. 37 38 Returns: 39 A dictionary representation of the data, guaranteed to be JSON serializable. 40 Returns an error dictionary if formatting fails. 41 """ 42 try: 43 payload = BaseFormatter.to_dict(data) 44 45 # Ensure it's JSON serializable by doing a round-trip with a custom default 46 import json 47 48 return json.loads(json.dumps(payload, default=str)) 49 except Exception as e: 50 logger.error(f"Failed to format data for webhook: {e}") 51 return {"error": "formatting_failed", "details": str(e)}
52 53
[docs] 54class WebhookForwarder(BaseForwarder): 55 """Forwards network events to a remote webhook asynchronously. 56 57 Subscribes to specified topics and sends data to a configured URL using HTTP POST. 58 It manages its own background worker thread for non-blocking operation and 59 supports various authentication methods and retry logic. 60 61 Notes: 62 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers. 63 Credentials should be provided via `WebhookConfiguration` using `SecretStr` 64 to prevent accidental leakage in logs. 65 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`). 66 It is highly recommended to keep this enabled in production. 67 - **Sensitive data**: While Mongoose normalizes data, users should ensure the 68 webhook endpoint is secured (HTTPS) as network event data may contain 69 sensitive infrastructure information (IPs, ports, protocol details). 70 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for 71 connection pooling and to keep authentication headers isolated. 72 - **Forwarding modes**: 73 - `immediate`: Sends data as soon as it is received. 74 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs. 75 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`. 76 """ 77
[docs] 78 def __init__(self, config: WebhookForwarderConfiguration): 79 """Initialize the WebhookForwarder. 80 81 Args: 82 config: A WebhookConfiguration instance defining the destination and security settings. 83 """ 84 super().__init__(topics=config.topics) 85 self.config = config 86 self.formatter = WebhookFormatter() 87 self._session = self._setup_session() 88 self._buffer = []
89 90 def _setup_session(self) -> requests.Session: 91 """Set up a requests Session with authentication and security settings. 92 93 Configures the session with custom headers, User-Agent, and applies 94 SSL verification and authentication based on the configuration. 95 96 Returns: 97 A configured requests.Session object. 98 """ 99 session = requests.Session() 100 session.verify = self.config.verify_ssl 101 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"}) 102 session.headers.update(self.config.headers) 103 self._apply_authentication(session) 104 return session 105 106 def _apply_authentication(self, session: requests.Session): 107 """Apply configured authentication to the session. 108 109 Based on `config.auth_type`, this method sets up HTTP Basic Auth, 110 Bearer Token in the Authorization header, or a custom API key header. 111 112 Args: 113 session: The requests.Session to configure. 114 """ 115 if not self.config.auth_token: 116 return 117 118 token = self.config.auth_token.get_secret_value() 119 120 if self.config.auth_type == "basic": 121 username, password = token.split(":", 1) 122 session.auth = HTTPBasicAuth(username, password) 123 elif self.config.auth_type == "bearer": 124 session.headers["Authorization"] = f"Bearer {token}" 125 elif self.config.auth_type == "header": 126 session.headers[self.config.auth_header_name] = token 127
[docs] 128 def start(self): 129 """Start the forwarder worker thread. 130 131 Launches a background daemon thread that subscribes to the configured 132 topics and processes incoming events. 133 """ 134 super().start() 135 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
136
[docs] 137 def match_filters(self, data): 138 """Check if data matches any configured filters. 139 Filters are applied using OR logic: if any filter matches, the data passes. 140 If no filters are configured, all data is allowed through. 141 142 Args: 143 data: The data object to check against configured filters. 144 145 Returns: 146 True if data passes filtering (matches at least one filter or no filters configured), 147 False otherwise. 148 """ 149 matches = len(self.config.filters) == 0 150 for f in self.config.filters: 151 if f.matches(data): 152 matches = True 153 return matches
154 155 def _run(self): 156 """Main worker loop to process messages and forward them. 157 158 Overrides BaseForwarder._run to support different forwarding modes. 159 """ 160 last_periodic_send = time.time() 161 162 while not self.processing_queue.processing_stopped() and self.config.enable: 163 try: 164 if self.config.mode == "immediate": 165 data = self.queue.get(timeout=1.0) 166 if data is not None: 167 if self.match_filters(data): 168 self.forward(data) 169 self.queue.task_done() 170 elif self.config.mode == "bulk": 171 try: 172 data = self.queue.get(timeout=1.0) 173 if data is not None: 174 if self.match_filters(data): 175 self._buffer.append(data) 176 if len(self._buffer) >= self.config.bulk_size: 177 self._flush_buffer() 178 self.queue.task_done() 179 except (Exception,): # timeout 180 if self._buffer: 181 self._flush_buffer() 182 elif self.config.mode == "periodic": 183 now = time.time() 184 # Collect data from queue 185 try: 186 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth 187 data = self.queue.get_nowait() 188 if data is not None: 189 if self.match_filters(data): 190 self._buffer.append(data) 191 self.queue.task_done() 192 except (Exception,): # empty queue 193 pass 194 195 if now - last_periodic_send >= self.config.periodic_interval: 196 if self._buffer: 197 # Send up to periodic_rate items 198 to_send = self._buffer[: self.config.periodic_rate] 199 self._buffer = self._buffer[self.config.periodic_rate :] 200 self._forward_batch(to_send) 201 last_periodic_send = now 202 203 time.sleep(0.1) # Avoid tight loop 204 205 except Exception as e: 206 import queue as q 207 208 if isinstance(e, q.Empty): 209 continue 210 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 211 212 # Final flush 213 if self._buffer: 214 self._flush_buffer() 215 216 def _flush_buffer(self): 217 """Send all data in the buffer.""" 218 if not self._buffer: 219 return 220 self._forward_batch(self._buffer) 221 self._buffer = [] 222 223 def _forward_batch(self, batch: list): 224 """Format and send a batch of data. 225 226 Args: 227 batch: List of data objects to forward. 228 """ 229 payloads = [] 230 for data in batch: 231 payload = self.formatter.format(data) 232 if payload.get("error") != "formatting_failed": 233 payloads.append(payload) 234 235 if not payloads: 236 return 237 238 # If it's a single item and NOT periodic/bulk, we might want to send it as is, 239 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple. 240 # However, many webhooks expect a single object or a list. 241 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0] 242 243 for attempt in range(self.config.retry_count + 1): 244 try: 245 self._send_payload(data_to_send) 246 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}") 247 return 248 except requests.exceptions.RequestException as e: 249 if not self._should_retry(e, attempt): 250 break 251 time.sleep(self.config.retry_delay) 252 self.disable() 253
[docs] 254 def disable(self): 255 """Disable the forwarder.""" 256 self.config.enable = False 257 self.processing_queue.unsubscribe(self.subscriber_id)
258
[docs] 259 def forward(self, data: Any): 260 """Send formatted data to the webhook URL with retries. 261 262 Orchestrates the formatting of data and the retry loop for delivery. 263 264 Args: 265 data: The data to forward (Pydantic model instance). 266 """ 267 268 self._forward_batch([data])
269 270 def _send_payload(self, payload: Dict[str, Any]): 271 """Execute the HTTP POST request. 272 273 Args: 274 payload: The dictionary to send as JSON. 275 276 Raises: 277 requests.exceptions.RequestException: If the request fails or returns an error status. 278 """ 279 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout) 280 response.raise_for_status() 281 282 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool: 283 """Determine if a request should be retried based on the exception and attempt count. 284 285 Args: 286 exception: The exception encountered during the request. 287 attempt: The current attempt number (starting at 0). 288 289 Returns: 290 True if the request should be retried, False otherwise. 291 """ 292 should_retry = True 293 if response := getattr(exception, "response", None): 294 if 400 <= response.status_code < 500: 295 should_retry = False 296 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}") 297 298 if should_retry and attempt < self.config.retry_count: 299 logger.warning( 300 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. " 301 f"Retrying in {self.config.retry_delay}s..." 302 ) 303 return True 304 305 if should_retry: 306 logger.error( 307 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}" 308 ) 309 return False