1import logging
2import threading
3from typing import Any, Dict, List, Optional
4
5from mongoose.core.processing import ProcessingQueue, ProcessingTopic
6
7logger = logging.getLogger(__name__)
8
9
34
35
[docs]
36class BaseForwarder:
37 """Base class for all forwarders.
38
39 Handles common tasks like subscribing to topics, managing a background
40 worker thread, and the main processing loop.
41 """
42
[docs]
43 def __init__(self, topics: List[str]):
44 """Initialize the BaseForwarder.
45
46 Args:
47 topics: List of topic strings to subscribe to.
48 """
49 self.topics_config = topics
50 self.processing_queue = ProcessingQueue()
51 self.thread: Optional[threading.Thread] = None
52 self.queue = None
53
[docs]
54 def start(self):
55 """Start the forwarder worker thread.
56
57 Launches a background daemon thread that subscribes to the configured
58 topics and processes incoming events.
59 """
60 if self.thread and self.thread.is_alive():
61 return
62
63 topics = self._resolve_topics()
64 if not topics:
65 logger.error(f"No valid topics for {self.__class__.__name__} to subscribe to.")
66 return
67
68 # Unique subscriber ID to avoid collisions
69 subscriber_id = f"{self.__class__.__name__.lower()}_{id(self)}"
70 self.queue = self.processing_queue.subscribe(topics, subscriber_id=subscriber_id)
71
72 self.thread = threading.Thread(target=self._run, daemon=True)
73 self.thread.start()
74 logger.info(f"{self.__class__.__name__} started")
75
76 def _run(self):
77 """Main worker loop to process messages and forward them.
78
79 Continuously polls the `ProcessingQueue` for new data from subscribed
80 topics. Terminates gracefully when the system-wide stop signal is received.
81 """
82 while not self.processing_queue.processing_stopped():
83 try:
84 data = self.queue.get(timeout=1.0)
85 if data is None:
86 self.queue.task_done()
87 continue
88
89 self.forward(data)
90 self.queue.task_done()
91 except Exception as e:
92 import queue as q
93
94 if isinstance(e, q.Empty):
95 continue
96 logger.error(f"Error in {self.__class__.__name__} worker: {e}")
97
98 def _resolve_topics(self) -> List[ProcessingTopic]:
99 """Convert topic strings from configuration to ProcessingTopic enums.
100
101 Returns:
102 A list of ProcessingTopic instances corresponding to configured topics.
103 """
104 topics = []
105 for t_str in self.topics_config:
106 try:
107 topics.append(ProcessingTopic(t_str))
108 except ValueError:
109 # Handle cases where t_str might be the enum value itself
110 found = False
111 for pt in ProcessingTopic:
112 if pt.value == t_str:
113 topics.append(pt)
114 found = True
115 break
116 if not found:
117 logger.warning(f"Unknown topic in {self.__class__.__name__} config: {t_str}")
118 return topics
119
[docs]
120 def forward(self, data: Any):
121 """Process and forward the data.
122
123 To be implemented by subclasses.
124
125 Args:
126 data: The data to forward.
127 """
128 raise NotImplementedError("Subclasses must implement forward()")