1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
2# SPDX-FileContributor: u039b <git@0x39b.fr>
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5
6import logging
7import threading
8from typing import Any, Dict, List, Optional
9
10from mongoose.core.processing import ProcessingQueue, ProcessingTopic
11
12logger = logging.getLogger(__name__)
13
14
39
40
[docs]
41class BaseForwarder:
42 """Base class for all forwarders.
43
44 Handles common tasks like subscribing to topics, managing a background
45 worker thread, and the main processing loop.
46 """
47
[docs]
48 def __init__(self, topics: List[str]):
49 """Initialize the BaseForwarder.
50
51 Args:
52 topics: List of topic strings to subscribe to.
53 """
54 self.topics_config = topics
55 self.processing_queue = ProcessingQueue()
56 self.thread: Optional[threading.Thread] = None
57 self.queue = None
58
59 @property
60 def subscriber_id(self):
61 return f"{self.__class__.__name__.lower()}_{id(self)}"
62
[docs]
63 def start(self):
64 """Start the forwarder worker thread.
65
66 Launches a background daemon thread that subscribes to the configured
67 topics and processes incoming events.
68 """
69 if self.thread and self.thread.is_alive():
70 return
71
72 topics = self._resolve_topics()
73 if not topics:
74 logger.error(f"No valid topics for {self.__class__.__name__} to subscribe to.")
75 return
76
77 # Unique subscriber ID to avoid collisions
78 self.queue = self.processing_queue.subscribe(topics, subscriber_id=self.subscriber_id)
79
80 self.thread = threading.Thread(target=self._run, daemon=True)
81 self.thread.start()
82 logger.info(f"{self.__class__.__name__} started")
83
84 def _run(self):
85 """Main worker loop to process messages and forward them.
86
87 Continuously polls the `ProcessingQueue` for new data from subscribed
88 topics. Terminates gracefully when the system-wide stop signal is received.
89 """
90 while not self.processing_queue.processing_stopped():
91 try:
92 data = self.queue.get(timeout=1.0)
93 if data is None:
94 self.queue.task_done()
95 continue
96
97 self.forward(data)
98 self.queue.task_done()
99 except Exception as e:
100 import queue as q
101
102 if isinstance(e, q.Empty):
103 continue
104 logger.error(f"Error in {self.__class__.__name__} worker: {e}")
105
106 def _resolve_topics(self) -> List[ProcessingTopic]:
107 """Convert topic strings from configuration to ProcessingTopic enums.
108
109 Returns:
110 A list of ProcessingTopic instances corresponding to configured topics.
111 """
112 topics = []
113 for t_str in self.topics_config:
114 try:
115 topics.append(ProcessingTopic(t_str))
116 except ValueError:
117 # Handle cases where t_str might be the enum value itself
118 found = False
119 for pt in ProcessingTopic:
120 if pt.value == t_str:
121 topics.append(pt)
122 found = True
123 break
124 if not found:
125 logger.warning(f"Unknown topic in {self.__class__.__name__} config: {t_str}")
126 return topics
127
[docs]
128 def forward(self, data: Any):
129 """Process and forward the data.
130
131 To be implemented by subclasses.
132
133 Args:
134 data: The data to forward.
135 """
136 raise NotImplementedError("Subclasses must implement forward()")