Source code for mongoose.forward.base

  1import logging
  2import threading
  3from typing import Any, Dict, List, Optional
  4
  5from mongoose.core.processing import ProcessingQueue, ProcessingTopic
  6
  7logger = logging.getLogger(__name__)
  8
  9
[docs] 10class BaseFormatter: 11 """Base class for formatters. 12 13 Provides common logic for converting Pydantic models to dictionaries. 14 """ 15
[docs] 16 @staticmethod 17 def to_dict(data: Any) -> Dict[str, Any]: 18 """Convert the given data into a dictionary. 19 20 Handles Pydantic v1/v2 instances and generic objects. 21 22 Args: 23 data: The model instance or data object to convert. 24 25 Returns: 26 A dictionary representation of the data. 27 """ 28 if hasattr(data, "model_dump"): 29 return data.model_dump() 30 elif hasattr(data, "dict"): 31 return data.dict() 32 else: 33 return {"data": str(data)}
34 35
[docs] 36class BaseForwarder: 37 """Base class for all forwarders. 38 39 Handles common tasks like subscribing to topics, managing a background 40 worker thread, and the main processing loop. 41 """ 42
[docs] 43 def __init__(self, topics: List[str]): 44 """Initialize the BaseForwarder. 45 46 Args: 47 topics: List of topic strings to subscribe to. 48 """ 49 self.topics_config = topics 50 self.processing_queue = ProcessingQueue() 51 self.thread: Optional[threading.Thread] = None 52 self.queue = None
53
[docs] 54 def start(self): 55 """Start the forwarder worker thread. 56 57 Launches a background daemon thread that subscribes to the configured 58 topics and processes incoming events. 59 """ 60 if self.thread and self.thread.is_alive(): 61 return 62 63 topics = self._resolve_topics() 64 if not topics: 65 logger.error(f"No valid topics for {self.__class__.__name__} to subscribe to.") 66 return 67 68 # Unique subscriber ID to avoid collisions 69 subscriber_id = f"{self.__class__.__name__.lower()}_{id(self)}" 70 self.queue = self.processing_queue.subscribe(topics, subscriber_id=subscriber_id) 71 72 self.thread = threading.Thread(target=self._run, daemon=True) 73 self.thread.start() 74 logger.info(f"{self.__class__.__name__} started")
75 76 def _run(self): 77 """Main worker loop to process messages and forward them. 78 79 Continuously polls the `ProcessingQueue` for new data from subscribed 80 topics. Terminates gracefully when the system-wide stop signal is received. 81 """ 82 while not self.processing_queue.processing_stopped(): 83 try: 84 data = self.queue.get(timeout=1.0) 85 if data is None: 86 self.queue.task_done() 87 continue 88 89 self.forward(data) 90 self.queue.task_done() 91 except Exception as e: 92 import queue as q 93 94 if isinstance(e, q.Empty): 95 continue 96 logger.error(f"Error in {self.__class__.__name__} worker: {e}") 97 98 def _resolve_topics(self) -> List[ProcessingTopic]: 99 """Convert topic strings from configuration to ProcessingTopic enums. 100 101 Returns: 102 A list of ProcessingTopic instances corresponding to configured topics. 103 """ 104 topics = [] 105 for t_str in self.topics_config: 106 try: 107 topics.append(ProcessingTopic(t_str)) 108 except ValueError: 109 # Handle cases where t_str might be the enum value itself 110 found = False 111 for pt in ProcessingTopic: 112 if pt.value == t_str: 113 topics.append(pt) 114 found = True 115 break 116 if not found: 117 logger.warning(f"Unknown topic in {self.__class__.__name__} config: {t_str}") 118 return topics 119
[docs] 120 def forward(self, data: Any): 121 """Process and forward the data. 122 123 To be implemented by subclasses. 124 125 Args: 126 data: The data to forward. 127 """ 128 raise NotImplementedError("Subclasses must implement forward()")