Source code for mongoose.forward.base

  1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
  2# SPDX-FileContributor: u039b <git@0x39b.fr>
  3#
  4# SPDX-License-Identifier: GPL-3.0-or-later
  5
  6import logging
  7import threading
  8from typing import Any, Dict, List, Optional
  9
 10from mongoose.core.processing import ProcessingQueue, ProcessingTopic
 11
 12logger = logging.getLogger(__name__)
 13
 14
[docs] 15class BaseFormatter: 16 """Base class for formatters. 17 18 Provides common logic for converting Pydantic models to dictionaries. 19 """ 20
[docs] 21 @staticmethod 22 def to_dict(data: Any) -> Dict[str, Any]: 23 """Convert the given data into a dictionary. 24 25 Handles Pydantic v1/v2 instances and generic objects. 26 27 Args: 28 data: The model instance or data object to convert. 29 30 Returns: 31 A dictionary representation of the data. 32 """ 33 if hasattr(data, "model_dump"): 34 return data.model_dump() 35 elif hasattr(data, "dict"): 36 return data.dict() 37 else: 38 return {"data": str(data)}
39 40
[docs] 41class BaseForwarder: 42 """Base class for all forwarders. 43 44 Handles common tasks like subscribing to topics, managing a background 45 worker thread, and the main processing loop. 46 """ 47
[docs] 48 def __init__(self, topics: List[str]): 49 """Initialize the BaseForwarder. 50 51 Args: 52 topics: List of topic strings to subscribe to. 53 """ 54 self.topics_config = topics 55 self.processing_queue = ProcessingQueue() 56 self.thread: Optional[threading.Thread] = None 57 self.queue = None
58 59 @property 60 def subscriber_id(self): 61 return f"{self.__class__.__name__.lower()}_{id(self)}" 62
[docs] 63 def start(self): 64 """Start the forwarder worker thread. 65 66 Launches a background daemon thread that subscribes to the configured 67 topics and processes incoming events. 68 """ 69 if self.thread and self.thread.is_alive(): 70 return 71 72 topics = self._resolve_topics() 73 if not topics: 74 logger.error(f"No valid topics for {self.__class__.__name__} to subscribe to.") 75 return 76 77 # Unique subscriber ID to avoid collisions 78 self.queue = self.processing_queue.subscribe(topics, subscriber_id=self.subscriber_id) 79 80 self.thread = threading.Thread(target=self._run, daemon=True) 81 self.thread.start() 82 logger.info(f"{self.__class__.__name__} started")
83 84 def _run(self): 85 """Main worker loop to process messages and forward them. 86 87 Continuously polls the `ProcessingQueue` for new data from subscribed 88 topics. Terminates gracefully when the system-wide stop signal is received. 89 """ 90 while not self.processing_queue.processing_stopped(): 91 try: 92 data = self.queue.get(timeout=1.0) 93 if data is None: 94 self.queue.task_done() 95 continue 96 97 self.forward(data) 98 self.queue.task_done() 99 except Exception as e: 100 import queue as q 101 102 if isinstance(e, q.Empty): 103 continue 104 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 105 106 def _resolve_topics(self) -> List[ProcessingTopic]: 107 """Convert topic strings from configuration to ProcessingTopic enums. 108 109 Returns: 110 A list of ProcessingTopic instances corresponding to configured topics. 111 """ 112 topics = [] 113 for t_str in self.topics_config: 114 try: 115 topics.append(ProcessingTopic(t_str)) 116 except ValueError: 117 # Handle cases where t_str might be the enum value itself 118 found = False 119 for pt in ProcessingTopic: 120 if pt.value == t_str: 121 topics.append(pt) 122 found = True 123 break 124 if not found: 125 logger.warning(f"Unknown topic in {self.__class__.__name__} config: {t_str}") 126 return topics 127
[docs] 128 def forward(self, data: Any): 129 """Process and forward the data. 130 131 To be implemented by subclasses. 132 133 Args: 134 data: The data to forward. 135 """ 136 raise NotImplementedError("Subclasses must implement forward()")