1import logging
2import time
3from typing import Any, Dict
4
5import requests
6from requests.auth import HTTPBasicAuth
7
8from mongoose.forward.base import BaseForwarder, BaseFormatter
9from mongoose.models.configuration import WebhookForwarderConfiguration
10
11logger = logging.getLogger(__name__)
12
13
52
53
[docs]
54class WebhookForwarder(BaseForwarder):
55 """Forwards network events to a remote webhook asynchronously.
56
57 Subscribes to specified topics and sends data to a configured URL using HTTP POST.
58 It manages its own background worker thread for non-blocking operation and
59 supports various authentication methods and retry logic.
60
61 Notes:
62 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers.
63 Credentials should be provided via `WebhookConfiguration` using `SecretStr`
64 to prevent accidental leakage in logs.
65 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`).
66 It is highly recommended to keep this enabled in production.
67 - **Sensitive data**: While Mongoose normalizes data, users should ensure the
68 webhook endpoint is secured (HTTPS) as network event data may contain
69 sensitive infrastructure information (IPs, ports, protocol details).
70 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for
71 connection pooling and to keep authentication headers isolated.
72 - **Forwarding modes**:
73 - `immediate`: Sends data as soon as it is received.
74 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs.
75 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`.
76 """
77
[docs]
78 def __init__(self, config: WebhookForwarderConfiguration):
79 """Initialize the WebhookForwarder.
80
81 Args:
82 config: A WebhookConfiguration instance defining the destination and security settings.
83 """
84 super().__init__(topics=config.topics)
85 self.config = config
86 self.formatter = WebhookFormatter()
87 self._session = self._setup_session()
88 self._buffer = []
89
90 def _setup_session(self) -> requests.Session:
91 """Set up a requests Session with authentication and security settings.
92
93 Configures the session with custom headers, User-Agent, and applies
94 SSL verification and authentication based on the configuration.
95
96 Returns:
97 A configured requests.Session object.
98 """
99 session = requests.Session()
100 session.verify = self.config.verify_ssl
101 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"})
102 session.headers.update(self.config.headers)
103 self._apply_authentication(session)
104 return session
105
106 def _apply_authentication(self, session: requests.Session):
107 """Apply configured authentication to the session.
108
109 Based on `config.auth_type`, this method sets up HTTP Basic Auth,
110 Bearer Token in the Authorization header, or a custom API key header.
111
112 Args:
113 session: The requests.Session to configure.
114 """
115 if not self.config.auth_token:
116 return
117
118 token = self.config.auth_token.get_secret_value()
119
120 if self.config.auth_type == "basic":
121 username, password = token.split(":", 1)
122 session.auth = HTTPBasicAuth(username, password)
123 elif self.config.auth_type == "bearer":
124 session.headers["Authorization"] = f"Bearer {token}"
125 elif self.config.auth_type == "header":
126 session.headers[self.config.auth_header_name] = token
127
[docs]
128 def start(self):
129 """Start the forwarder worker thread.
130
131 Launches a background daemon thread that subscribes to the configured
132 topics and processes incoming events.
133 """
134 super().start()
135 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
136
[docs]
137 def match_filters(self, data):
138 """Check if data matches any configured filters.
139 Filters are applied using OR logic: if any filter matches, the data passes.
140 If no filters are configured, all data is allowed through.
141
142 Args:
143 data: The data object to check against configured filters.
144
145 Returns:
146 True if data passes filtering (matches at least one filter or no filters configured),
147 False otherwise.
148 """
149 matches = len(self.config.filters) == 0
150 for f in self.config.filters:
151 if f.matches(data):
152 matches = True
153 return matches
154
155 def _run(self):
156 """Main worker loop to process messages and forward them.
157
158 Overrides BaseForwarder._run to support different forwarding modes.
159 """
160 last_periodic_send = time.time()
161
162 while not self.processing_queue.processing_stopped() and self.config.enable:
163 try:
164 if self.config.mode == "immediate":
165 data = self.queue.get(timeout=1.0)
166 if data is not None:
167 if self.match_filters(data):
168 self.forward(data)
169 self.queue.task_done()
170 elif self.config.mode == "bulk":
171 try:
172 data = self.queue.get(timeout=1.0)
173 if data is not None:
174 if self.match_filters(data):
175 self._buffer.append(data)
176 if len(self._buffer) >= self.config.bulk_size:
177 self._flush_buffer()
178 self.queue.task_done()
179 except (Exception,): # timeout
180 if self._buffer:
181 self._flush_buffer()
182 elif self.config.mode == "periodic":
183 now = time.time()
184 # Collect data from queue
185 try:
186 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth
187 data = self.queue.get_nowait()
188 if data is not None:
189 if self.match_filters(data):
190 self._buffer.append(data)
191 self.queue.task_done()
192 except (Exception,): # empty queue
193 pass
194
195 if now - last_periodic_send >= self.config.periodic_interval:
196 if self._buffer:
197 # Send up to periodic_rate items
198 to_send = self._buffer[: self.config.periodic_rate]
199 self._buffer = self._buffer[self.config.periodic_rate :]
200 self._forward_batch(to_send)
201 last_periodic_send = now
202
203 time.sleep(0.1) # Avoid tight loop
204
205 except Exception as e:
206 import queue as q
207
208 if isinstance(e, q.Empty):
209 continue
210 logger.error(f"Error in {self.__class__.__name__} worker: {e}")
211
212 # Final flush
213 if self._buffer:
214 self._flush_buffer()
215
216 def _flush_buffer(self):
217 """Send all data in the buffer."""
218 if not self._buffer:
219 return
220 self._forward_batch(self._buffer)
221 self._buffer = []
222
223 def _forward_batch(self, batch: list):
224 """Format and send a batch of data.
225
226 Args:
227 batch: List of data objects to forward.
228 """
229 payloads = []
230 for data in batch:
231 payload = self.formatter.format(data)
232 if payload.get("error") != "formatting_failed":
233 payloads.append(payload)
234
235 if not payloads:
236 return
237
238 # If it's a single item and NOT periodic/bulk, we might want to send it as is,
239 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple.
240 # However, many webhooks expect a single object or a list.
241 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0]
242
243 for attempt in range(self.config.retry_count + 1):
244 try:
245 self._send_payload(data_to_send)
246 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}")
247 return
248 except requests.exceptions.RequestException as e:
249 if not self._should_retry(e, attempt):
250 break
251 time.sleep(self.config.retry_delay)
252 self.disable()
253
[docs]
254 def disable(self):
255 """Disable the forwarder."""
256 self.config.enable = False
257 self.processing_queue.unsubscribe(self.subscriber_id)
258
[docs]
259 def forward(self, data: Any):
260 """Send formatted data to the webhook URL with retries.
261
262 Orchestrates the formatting of data and the retry loop for delivery.
263
264 Args:
265 data: The data to forward (Pydantic model instance).
266 """
267
268 self._forward_batch([data])
269
270 def _send_payload(self, payload: Dict[str, Any]):
271 """Execute the HTTP POST request.
272
273 Args:
274 payload: The dictionary to send as JSON.
275
276 Raises:
277 requests.exceptions.RequestException: If the request fails or returns an error status.
278 """
279 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout)
280 response.raise_for_status()
281
282 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool:
283 """Determine if a request should be retried based on the exception and attempt count.
284
285 Args:
286 exception: The exception encountered during the request.
287 attempt: The current attempt number (starting at 0).
288
289 Returns:
290 True if the request should be retried, False otherwise.
291 """
292 should_retry = True
293 if response := getattr(exception, "response", None):
294 if 400 <= response.status_code < 500:
295 should_retry = False
296 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}")
297
298 if should_retry and attempt < self.config.retry_count:
299 logger.warning(
300 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. "
301 f"Retrying in {self.config.retry_delay}s..."
302 )
303 return True
304
305 if should_retry:
306 logger.error(
307 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}"
308 )
309 return False