mongoose.collect.nfstream_collector

class mongoose.collect.nfstream_collector.NFStreamCollector(configuration)[source]

Bases: Thread

Collector that reads network flows using NFStreamer in a separate thread.

This class extends threading.Thread to run flow collection concurrently. It instantiates an NFStreamer using values from NFStreamConfiguration, converts each captured NFStreamer flow into the project’s NetworkDPI model, resolves a human-readable protocol name via PROTOCOL_NUMBERS, and publishes the flow to a ProcessingQueue.

The collector attempts to be robust: it logs exceptions, performs safe conversions for protocol and timestamps, and closes the streamer resource (if supported) on shutdown.

__init__(configuration)[source]

Initialize the collector with the provided configuration.

Parameters:

configuration (NFStreamConfiguration) – An NFStreamConfiguration instance containing interface, active_timeout.

collect()[source]

Perform flow collection using NFStreamer.

This method:
  • Creates an NFStreamer configured with interface, active_timeout.

  • Iterates flows emitted by NFStreamer, converts them to NetworkDPI objects (excluding the id field), resolves the protocol keyword, and publishes each flow to processing_queue under ProcessingTopic.NETWORK_DPI.

  • Stops if processing_queue.processing_stopped() returns True.

static resolve_protocol(flow)[source]

Resolve and set the protocol keyword on a NetworkDPI based on the numeric protocol value.

Parameters:

flow (NetworkDPI) – NetworkDPI instance with a protocol_number attribute.

Behavior:
  • Looks up protocol_number in PROTOCOL_NUMBERS.

  • If found, sets flow.protocol to the mapping’s “keyword”.

  • If not found or protocol_number is invalid, leaves flow.protocol unchanged (may be None).

run()[source]

Thread entrypoint: delegate to collect() so this object can be started via thread.start().

configuration

Configuration object providing interface and active_timeout used to configure NFStreamer.