mongoose.core.processing
- class mongoose.core.processing.ProcessingQueue[source]
Bases:
objectA thread-safe pub-sub queue system for managing topic-based message distribution.
- This class implements a publish-subscribe pattern where:
Subscribers register their interest in one or more topics using subscribe().
Publishers send data to specific topics using publish().
Internally, each subscriber receives a dedicated queue.Queue. When data is published to a topic, it is duplicated and pushed into the queues of all subscribers registered for that topic. This ensures that multiple subscribers (e.g., a database storer and a webhook forwarder) can process the same stream of events independently.
Note
Topics are created dynamically when the first subscriber registers. Therefore, subscribers MUST be registered before any data is published to a topic. If publish() is called for a topic with no active subscribers, it will raise a TopicNotFoundException.
- join()[source]
Block until all queues have processed their tasks.
Waits for all subscriber queues across all topics to complete processing their current items. This is useful for ensuring clean shutdown.
- processing_stopped()[source]
Check if processing has been stopped.
- Returns:
True if stop processing has been signaled, False otherwise.
- Return type:
- publish(topic, data)[source]
Publish data to all subscribers of a specific topic.
Attempts to add data to all queues subscribed to the given topic without blocking. Each subscriber of the topic will receive a copy of the data in its dedicated queue.
- Parameters:
topic (ProcessingTopic) – The topic to publish data to.
data (Any) – The data to publish to subscribers.
- Raises:
TopicNotFoundException – If the topic has no subscribers (no one called subscribe() for it).
Full – If any subscriber’s queue is full and cannot accept more data.
Note
This method requires the topic to have at least one subscriber. If no one has subscribed to the topic yet, it is considered non-existent.
- stop_processing()[source]
Signal all processing operations to stop.
Sets the stop processing event flag, which can be checked by workers to gracefully terminate their operations.
- subscribe(topic, subscriber_id, queue_size=100)[source]
Subscribe to one or more topics and receive a dedicated queue for receiving data.
This method registers a subscriber for the specified topics. If it’s the first time a topic is subscribed to, the topic is effectively “created” in the system.
Ordering requirement: Subscribers must register before publishers attempt to send data to a topic.
Creates a new queue for the subscriber if they haven’t already subscribed to the topic(s). If multiple topics are provided, they will share the same queue for this subscriber. If the subscriber has already subscribed to any of the topics, returns the existing queue.
- Parameters:
topic (ProcessingTopic | List[ProcessingTopic]) – The topic name(s) to subscribe to. Can be a single ProcessingTopic or a list.
subscriber_id (str) – Unique identifier for the subscriber.
queue_size – Maximum number of items the queue can hold (default: 100).
- Returns:
The queue for receiving published data.
- Return type:
- queues: Dict[ProcessingTopic, List[Queue]] = {}
Mapping of topics to lists of subscriber queues.
- stop_processing_event = <threading.Event at 0x7f0f90834380: unset>
Event flag to signal processing termination.
- class mongoose.core.processing.ProcessingTopic(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum- ENRICHED_NETWORK_ALERT = 'enriched-network-alert'
- ENRICHED_NETWORK_DPI = 'enriched-network-dpi'
- ENRICHED_NETWORK_FLOW = 'enriched-network-flow'
- NETWORK_ALERT = 'network-alert'
- NETWORK_DPI = 'network-dpi'
- NETWORK_FLOW = 'network-flow'