mongoose.collect.nfstream_collector
- class mongoose.collect.nfstream_collector.NFStreamCollector(configuration)[source]
Bases:
ThreadCollector that reads network flows using NFStreamer in a separate thread.
This class extends threading.Thread to run flow collection concurrently. It instantiates an NFStreamer using values from NFStreamConfiguration, converts each captured NFStreamer flow into the project’s NetworkDPI model, resolves a human-readable protocol name via PROTOCOL_NUMBERS, and publishes the flow to a ProcessingQueue.
The collector attempts to be robust: it logs exceptions, performs safe conversions for protocol and timestamps, and closes the streamer resource (if supported) on shutdown.
- __init__(configuration)[source]
Initialize the collector with the provided configuration.
- Parameters:
configuration (NFStreamConfiguration) – An NFStreamConfiguration instance containing interface, active_timeout.
- collect()[source]
Perform flow collection using NFStreamer.
- This method:
Creates an NFStreamer configured with interface, active_timeout.
Iterates flows emitted by NFStreamer, converts them to NetworkDPI objects (excluding the id field), resolves the protocol keyword, and publishes each flow to processing_queue under ProcessingTopic.NETWORK_DPI.
Stops if processing_queue.processing_stopped() returns True.
- static resolve_protocol(flow)[source]
Resolve and set the protocol keyword on a NetworkDPI based on the numeric protocol value.
- Parameters:
flow (NetworkDPI) – NetworkDPI instance with a protocol_number attribute.
- Behavior:
Looks up protocol_number in PROTOCOL_NUMBERS.
If found, sets flow.protocol to the mapping’s “keyword”.
If not found or protocol_number is invalid, leaves flow.protocol unchanged (may be None).
- run()[source]
Thread entrypoint: delegate to collect() so this object can be started via thread.start().
- configuration
Configuration object providing interface and active_timeout used to configure NFStreamer.