1import logging
2import time
3from typing import Any, Dict
4
5import requests
6from requests.auth import HTTPBasicAuth
7
8from mongoose.forward.base import BaseForwarder, BaseFormatter
9from mongoose.models.configuration import WebhookForwarderConfiguration
10
11logger = logging.getLogger(__name__)
12
13
52
53
[docs]
54class WebhookForwarder(BaseForwarder):
55 """Forwards network events to a remote webhook asynchronously.
56
57 Subscribes to specified topics and sends data to a configured URL using HTTP POST.
58 It manages its own background worker thread for non-blocking operation and
59 supports various authentication methods and retry logic.
60
61 Notes:
62 - **Authentication**: Supports Basic Auth, Bearer tokens, and custom headers.
63 Credentials should be provided via `WebhookConfiguration` using `SecretStr`
64 to prevent accidental leakage in logs.
65 - **SSL/TLS**: SSL certificate verification is enabled by default (`verify_ssl=True`).
66 It is highly recommended to keep this enabled in production.
67 - **Sensitive data**: While Mongoose normalizes data, users should ensure the
68 webhook endpoint is secured (HTTPS) as network event data may contain
69 sensitive infrastructure information (IPs, ports, protocol details).
70 - **Isolation**: Each forwarder uses a dedicated `requests.Session` for
71 connection pooling and to keep authentication headers isolated.
72 - **Forwarding modes**:
73 - `immediate`: Sends data as soon as it is received.
74 - `bulk`: Accumulates data up to `bulk_size` or until a timeout occurs.
75 - `periodic`: Sends data at a fixed `periodic_interval` at a maximum `periodic_rate`.
76 """
77
[docs]
78 def __init__(self, config: WebhookForwarderConfiguration):
79 """Initialize the WebhookForwarder.
80
81 Args:
82 config: A WebhookConfiguration instance defining the destination and security settings.
83 """
84 super().__init__(topics=config.topics)
85 self.config = config
86 self.formatter = WebhookFormatter()
87 self._session = self._setup_session()
88 self._buffer = []
89
90 def _setup_session(self) -> requests.Session:
91 """Set up a requests Session with authentication and security settings.
92
93 Configures the session with custom headers, User-Agent, and applies
94 SSL verification and authentication based on the configuration.
95
96 Returns:
97 A configured requests.Session object.
98 """
99 session = requests.Session()
100 session.verify = self.config.verify_ssl
101 session.headers.update({"User-Agent": "Mongoose-Webhook-Forwarder/1.0", "Content-Type": "application/json"})
102 session.headers.update(self.config.headers)
103 self._apply_authentication(session)
104 return session
105
106 def _apply_authentication(self, session: requests.Session):
107 """Apply configured authentication to the session.
108
109 Based on `config.auth_type`, this method sets up HTTP Basic Auth,
110 Bearer Token in the Authorization header, or a custom API key header.
111
112 Args:
113 session: The requests.Session to configure.
114 """
115 if not self.config.auth_token:
116 return
117
118 token = self.config.auth_token.get_secret_value()
119
120 if self.config.auth_type == "basic":
121 username, password = token.split(":", 1)
122 session.auth = HTTPBasicAuth(username, password)
123 elif self.config.auth_type == "bearer":
124 session.headers["Authorization"] = f"Bearer {token}"
125 elif self.config.auth_type == "header":
126 session.headers[self.config.auth_header_name] = token
127
[docs]
128 def start(self):
129 """Start the forwarder worker thread.
130
131 Launches a background daemon thread that subscribes to the configured
132 topics and processes incoming events.
133 """
134 super().start()
135 logger.info(f"Forwarding to {self.config.url} [mode={self.config.mode}]")
136
137 def _run(self):
138 """Main worker loop to process messages and forward them.
139
140 Overrides BaseForwarder._run to support different forwarding modes.
141 """
142 last_periodic_send = time.time()
143
144 while not self.processing_queue.processing_stopped():
145 try:
146 if self.config.mode == "immediate":
147 data = self.queue.get(timeout=1.0)
148 if data is not None:
149 self.forward(data)
150 self.queue.task_done()
151 elif self.config.mode == "bulk":
152 try:
153 data = self.queue.get(timeout=1.0)
154 if data is not None:
155 self._buffer.append(data)
156 if len(self._buffer) >= self.config.bulk_size:
157 self._flush_buffer()
158 self.queue.task_done()
159 except (Exception,): # timeout
160 if self._buffer:
161 self._flush_buffer()
162 elif self.config.mode == "periodic":
163 now = time.time()
164 # Collect data from queue
165 try:
166 while len(self._buffer) < self.config.periodic_rate * 2: # Limit buffer growth
167 data = self.queue.get_nowait()
168 if data is not None:
169 self._buffer.append(data)
170 self.queue.task_done()
171 except (Exception,): # empty queue
172 pass
173
174 if now - last_periodic_send >= self.config.periodic_interval:
175 if self._buffer:
176 # Send up to periodic_rate items
177 to_send = self._buffer[: self.config.periodic_rate]
178 self._buffer = self._buffer[self.config.periodic_rate :]
179 self._forward_batch(to_send)
180 last_periodic_send = now
181
182 time.sleep(0.1) # Avoid tight loop
183
184 except Exception as e:
185 import queue as q
186
187 if isinstance(e, q.Empty):
188 continue
189 logger.error(f"Error in {self.__class__.__name__} worker: {e}")
190
191 # Final flush
192 if self._buffer:
193 self._flush_buffer()
194
195 def _flush_buffer(self):
196 """Send all data in the buffer."""
197 if not self._buffer:
198 return
199 self._forward_batch(self._buffer)
200 self._buffer = []
201
202 def _forward_batch(self, batch: list):
203 """Format and send a batch of data.
204
205 Args:
206 batch: List of data objects to forward.
207 """
208 payloads = []
209 for data in batch:
210 payload = self.formatter.format(data)
211 if payload.get("error") != "formatting_failed":
212 payloads.append(payload)
213
214 if not payloads:
215 return
216
217 # If it's a single item and NOT periodic/bulk, we might want to send it as is,
218 # but for consistency and simplicity in bulk/periodic we send it as a list if there are multiple.
219 # However, many webhooks expect a single object or a list.
220
221 data_to_send = payloads if len(payloads) > 1 or self.config.mode != "immediate" else payloads[0]
222
223 for attempt in range(self.config.retry_count + 1):
224 try:
225 self._send_payload(data_to_send)
226 logger.debug(f"Successfully forwarded {len(payloads)} items to {self.config.url}")
227 return
228 except requests.exceptions.RequestException as e:
229 if not self._should_retry(e, attempt):
230 break
231 time.sleep(self.config.retry_delay)
232
[docs]
233 def forward(self, data: Any):
234 """Send formatted data to the webhook URL with retries.
235
236 Orchestrates the formatting of data and the retry loop for delivery.
237
238 Args:
239 data: The data to forward (Pydantic model instance).
240 """
241
242 self._forward_batch([data])
243
244 def _send_payload(self, payload: Dict[str, Any]):
245 """Execute the HTTP POST request.
246
247 Args:
248 payload: The dictionary to send as JSON.
249
250 Raises:
251 requests.exceptions.RequestException: If the request fails or returns an error status.
252 """
253 response = self._session.post(str(self.config.url), json=payload, timeout=self.config.timeout)
254 response.raise_for_status()
255
256 def _should_retry(self, exception: requests.exceptions.RequestException, attempt: int) -> bool:
257 """Determine if a request should be retried based on the exception and attempt count.
258
259 Args:
260 exception: The exception encountered during the request.
261 attempt: The current attempt number (starting at 0).
262
263 Returns:
264 True if the request should be retried, False otherwise.
265 """
266 should_retry = True
267 if response := getattr(exception, "response", None):
268 if 400 <= response.status_code < 500:
269 should_retry = False
270 logger.error(f"Client error ({response.status_code}) when forwarding to {self.config.url}: {exception}")
271
272 if should_retry and attempt < self.config.retry_count:
273 logger.warning(
274 f"Attempt {attempt + 1} failed to forward to {self.config.url}: {exception}. "
275 f"Retrying in {self.config.retry_delay}s..."
276 )
277 return True
278
279 if should_retry:
280 logger.error(
281 f"Failed to forward data to {self.config.url} after {self.config.retry_count + 1} attempts: {exception}"
282 )
283 return False
284
285
286# Discord-specific forwarder implementation moved to `mongoose.forward.discord`.
287# See mongoose/forward/discord.py for the DiscordFormatter and DiscordForwarder