Source code for mongoose.forward.base

  1import logging
  2import threading
  3from typing import Any, Dict, List, Optional
  4
  5from mongoose.core.processing import ProcessingQueue, ProcessingTopic
  6
  7logger = logging.getLogger(__name__)
  8
  9
[docs] 10class BaseFormatter: 11 """Base class for formatters. 12 13 Provides common logic for converting Pydantic models to dictionaries. 14 """ 15
[docs] 16 @staticmethod 17 def to_dict(data: Any) -> Dict[str, Any]: 18 """Convert the given data into a dictionary. 19 20 Handles Pydantic v1/v2 instances and generic objects. 21 22 Args: 23 data: The model instance or data object to convert. 24 25 Returns: 26 A dictionary representation of the data. 27 """ 28 if hasattr(data, "model_dump"): 29 return data.model_dump() 30 elif hasattr(data, "dict"): 31 return data.dict() 32 else: 33 return {"data": str(data)}
34 35
[docs] 36class BaseForwarder: 37 """Base class for all forwarders. 38 39 Handles common tasks like subscribing to topics, managing a background 40 worker thread, and the main processing loop. 41 """ 42
[docs] 43 def __init__(self, topics: List[str]): 44 """Initialize the BaseForwarder. 45 46 Args: 47 topics: List of topic strings to subscribe to. 48 """ 49 self.topics_config = topics 50 self.processing_queue = ProcessingQueue() 51 self.thread: Optional[threading.Thread] = None 52 self.queue = None
53 54 @property 55 def subscriber_id(self): 56 return f"{self.__class__.__name__.lower()}_{id(self)}" 57
[docs] 58 def start(self): 59 """Start the forwarder worker thread. 60 61 Launches a background daemon thread that subscribes to the configured 62 topics and processes incoming events. 63 """ 64 if self.thread and self.thread.is_alive(): 65 return 66 67 topics = self._resolve_topics() 68 if not topics: 69 logger.error(f"No valid topics for {self.__class__.__name__} to subscribe to.") 70 return 71 72 # Unique subscriber ID to avoid collisions 73 self.queue = self.processing_queue.subscribe(topics, subscriber_id=self.subscriber_id) 74 75 self.thread = threading.Thread(target=self._run, daemon=True) 76 self.thread.start() 77 logger.info(f"{self.__class__.__name__} started")
78 79 def _run(self): 80 """Main worker loop to process messages and forward them. 81 82 Continuously polls the `ProcessingQueue` for new data from subscribed 83 topics. Terminates gracefully when the system-wide stop signal is received. 84 """ 85 while not self.processing_queue.processing_stopped(): 86 try: 87 data = self.queue.get(timeout=1.0) 88 if data is None: 89 self.queue.task_done() 90 continue 91 92 self.forward(data) 93 self.queue.task_done() 94 except Exception as e: 95 import queue as q 96 97 if isinstance(e, q.Empty): 98 continue 99 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 100 101 def _resolve_topics(self) -> List[ProcessingTopic]: 102 """Convert topic strings from configuration to ProcessingTopic enums. 103 104 Returns: 105 A list of ProcessingTopic instances corresponding to configured topics. 106 """ 107 topics = [] 108 for t_str in self.topics_config: 109 try: 110 topics.append(ProcessingTopic(t_str)) 111 except ValueError: 112 # Handle cases where t_str might be the enum value itself 113 found = False 114 for pt in ProcessingTopic: 115 if pt.value == t_str: 116 topics.append(pt) 117 found = True 118 break 119 if not found: 120 logger.warning(f"Unknown topic in {self.__class__.__name__} config: {t_str}") 121 return topics 122
[docs] 123 def forward(self, data: Any): 124 """Process and forward the data. 125 126 To be implemented by subclasses. 127 128 Args: 129 data: The data to forward. 130 """ 131 raise NotImplementedError("Subclasses must implement forward()")