Source code for mongoose.forward.webhook

  1import logging
  2import time
  3from typing import Any, Dict
  4
  5import requests
  6from requests.auth import HTTPBasicAuth
  7
  8from mongoose.forward.base import BaseForwarder, BaseFormatter
  9from mongoose.models.configuration import WebhookForwarderConfiguration
 10
 11logger = logging.getLogger(__name__)
 12
 13
[docs] 14class WebhookFormatter(BaseFormatter): 15 """Formats network data for webhook consumption. 16 17 This class handles the conversion of network models (DPI, Alert, Flow) into 18 JSON-serializable dictionaries. It ensures that complex types like datetime 19 objects are correctly converted to strings and that any Pydantic-specific 20 fields are handled according to the Pydantic version in use. 21 22 Security considerations: 23 - The formatter performs a JSON round-trip to ensure data consistency 24 and safety before it is sent over the network. 25 - It uses a custom default for serialization to prevent failures on 26 complex types, ensuring robust operation even with unexpected data. 27 """ 28
[docs] 29 @staticmethod 30 def format(data: Any) -> Dict[str, Any]: 31 """Format the given data into a dictionary for JSON serialization. 32 33 Handles Pydantic v1/v2 instances and generic objects. 34 35 Args: 36 data: The model instance or data object to format. 37 38 Returns: 39 A dictionary representation of the data, guaranteed to be JSON serializable. 40 Returns an error dictionary if formatting fails. 41 """ 42 try: 43 payload = BaseFormatter.to_dict(data) 44 45 # Ensure it's JSON serializable by doing a round-trip with a custom default 46 import json 47 48 return json.loads(json.dumps(payload, default=str)) 49 except Exception as e: 50 logger.error(f"Failed to format data for webhook: {e}") 51 return {"error": "formatting_failed", "details": str(e)}
52 53
[docs] 54class WebhookForwarder(BaseForwarder): 55 """Forwards network events to a remote webhook asynchronously. 56 57 Subscribes to specified topics and sends data to a configured URL using HTTP POST. 58 It manages its own background worker thread for non-blocking operation and 59 supports various authentication methods and retry logic. 60 61 Notes: 62 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers. 63 Credentials should be provided via `WebhookConfiguration` using `SecretStr` 64 to prevent accidental leakage in logs. 65 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`). 66 It is highly recommended to keep this enabled in production. 67 - **Sensitive data**: While Mongoose normalizes data, users should ensure the 68 webhook endpoint is secured (HTTPS) as network event data may contain 69 sensitive infrastructure information (IPs, ports, protocol details). 70 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for 71 connection pooling and to keep authentication headers isolated. 72 - **Forwarding modes**: 73 - `immediate`: Sends data as soon as it is received. 74 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs. 75 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`. 76 """ 77
[docs] 78 def __init__(self, config: WebhookForwarderConfiguration): 79 """Initialize the WebhookForwarder. 80 81 Args: 82 config: A WebhookConfiguration instance defining the destination and security settings. 83 """ 84 super().__init__(topics=config.topics) 85 self.config = config 86 self.formatter = WebhookFormatter() 87 self._session = self._setup_session() 88 self._buffer = []
89 90 def _setup_session(self) -> requests.Session: 91 """Set up a requests Session with authentication and security settings. 92 93 Configures the session with custom headers, User-Agent, and applies 94 SSL verification and authentication based on the configuration. 95 96 Returns: 97 A configured requests.Session object. 98 """ 99 session = requests.Session() 100 session.verify = self.config.verify_ssl 101 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"}) 102 session.headers.update(self.config.headers) 103 self._apply_authentication(session) 104 return session 105 106 def _apply_authentication(self, session: requests.Session): 107 """Apply configured authentication to the session. 108 109 Based on `config.auth_type`, this method sets up HTTP Basic Auth, 110 Bearer Token in the Authorization header, or a custom API key header. 111 112 Args: 113 session: The requests.Session to configure. 114 """ 115 if not self.config.auth_token: 116 return 117 118 token = self.config.auth_token.get_secret_value() 119 120 if self.config.auth_type == "basic": 121 username, password = token.split(":", 1) 122 session.auth = HTTPBasicAuth(username, password) 123 elif self.config.auth_type == "bearer": 124 session.headers["Authorization"] = f"Bearer {token}" 125 elif self.config.auth_type == "header": 126 session.headers[self.config.auth_header_name] = token 127
[docs] 128 def start(self): 129 """Start the forwarder worker thread. 130 131 Launches a background daemon thread that subscribes to the configured 132 topics and processes incoming events. 133 """ 134 super().start() 135 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
136 137 def _run(self): 138 """Main worker loop to process messages and forward them. 139 140 Overrides BaseForwarder._run to support different forwarding modes. 141 """ 142 last_periodic_send = time.time() 143 144 while not self.processing_queue.processing_stopped(): 145 try: 146 if self.config.mode == "immediate": 147 data = self.queue.get(timeout=1.0) 148 if data is not None: 149 self.forward(data) 150 self.queue.task_done() 151 elif self.config.mode == "bulk": 152 try: 153 data = self.queue.get(timeout=1.0) 154 if data is not None: 155 self._buffer.append(data) 156 if len(self._buffer) >= self.config.bulk_size: 157 self._flush_buffer() 158 self.queue.task_done() 159 except (Exception,): # timeout 160 if self._buffer: 161 self._flush_buffer() 162 elif self.config.mode == "periodic": 163 now = time.time() 164 # Collect data from queue 165 try: 166 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth 167 data = self.queue.get_nowait() 168 if data is not None: 169 self._buffer.append(data) 170 self.queue.task_done() 171 except (Exception,): # empty queue 172 pass 173 174 if now - last_periodic_send >= self.config.periodic_interval: 175 if self._buffer: 176 # Send up to periodic_rate items 177 to_send = self._buffer[: self.config.periodic_rate] 178 self._buffer = self._buffer[self.config.periodic_rate :] 179 self._forward_batch(to_send) 180 last_periodic_send = now 181 182 time.sleep(0.1) # Avoid tight loop 183 184 except Exception as e: 185 import queue as q 186 187 if isinstance(e, q.Empty): 188 continue 189 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 190 191 # Final flush 192 if self._buffer: 193 self._flush_buffer() 194 195 def _flush_buffer(self): 196 """Send all data in the buffer.""" 197 if not self._buffer: 198 return 199 self._forward_batch(self._buffer) 200 self._buffer = [] 201 202 def _forward_batch(self, batch: list): 203 """Format and send a batch of data. 204 205 Args: 206 batch: List of data objects to forward. 207 """ 208 payloads = [] 209 for data in batch: 210 payload = self.formatter.format(data) 211 if payload.get("error") != "formatting_failed": 212 payloads.append(payload) 213 214 if not payloads: 215 return 216 217 # If it's a single item and NOT periodic/bulk, we might want to send it as is, 218 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple. 219 # However, many webhooks expect a single object or a list. 220 221 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0] 222 223 for attempt in range(self.config.retry_count + 1): 224 try: 225 self._send_payload(data_to_send) 226 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}") 227 return 228 except requests.exceptions.RequestException as e: 229 if not self._should_retry(e, attempt): 230 break 231 time.sleep(self.config.retry_delay) 232
[docs] 233 def forward(self, data: Any): 234 """Send formatted data to the webhook URL with retries. 235 236 Orchestrates the formatting of data and the retry loop for delivery. 237 238 Args: 239 data: The data to forward (Pydantic model instance). 240 """ 241 242 self._forward_batch([data])
243 244 def _send_payload(self, payload: Dict[str, Any]): 245 """Execute the HTTP POST request. 246 247 Args: 248 payload: The dictionary to send as JSON. 249 250 Raises: 251 requests.exceptions.RequestException: If the request fails or returns an error status. 252 """ 253 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout) 254 response.raise_for_status() 255 256 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool: 257 """Determine if a request should be retried based on the exception and attempt count. 258 259 Args: 260 exception: The exception encountered during the request. 261 attempt: The current attempt number (starting at 0). 262 263 Returns: 264 True if the request should be retried, False otherwise. 265 """ 266 should_retry = True 267 if response := getattr(exception, "response", None): 268 if 400 <= response.status_code < 500: 269 should_retry = False 270 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}") 271 272 if should_retry and attempt < self.config.retry_count: 273 logger.warning( 274 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. " 275 f"Retrying in {self.config.retry_delay}s..." 276 ) 277 return True 278 279 if should_retry: 280 logger.error( 281 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}" 282 ) 283 return False
284 285 286# Discord-specific forwarder implementation moved to `mongoose.forward.discord`. 287# See mongoose/forward/discord.py for the DiscordFormatter and DiscordForwarder