Source code for mongoose.forward.file

  1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
  2# SPDX-FileContributor: u039b <git@0x39b.fr>
  3#
  4# SPDX-License-Identifier: GPL-3.0-or-later
  5
  6import json
  7import logging
  8import os
  9from typing import Any
 10
 11from mongoose.core.processing import ProcessingTopic
 12from mongoose.forward.base import BaseForwarder, BaseFormatter
 13from mongoose.models.configuration import FileForwarderConfiguration
 14
 15logger = logging.getLogger(__name__)
 16
 17
[docs] 18class FileFormatter(BaseFormatter): 19 """Formats network data for file storage. 20 21 This class handles the conversion of network models (DPI, Alert, Flow) into 22 JSON-serializable strings. It ensures that complex types like datetime 23 objects are correctly converted to strings and that any Pydantic-specific 24 fields are handled according to the Pydantic version in use. 25 """ 26
[docs] 27 @staticmethod 28 def format(data: Any) -> str: 29 """Format the given data into a JSON string. 30 31 Args: 32 data: The model instance or data object to format. 33 34 Returns: 35 A JSON string representation of the data followed by a newline. 36 Returns an empty string if formatting fails. 37 """ 38 try: 39 payload = BaseFormatter.to_dict(data) 40 41 # Ensure complex types are handled 42 return json.dumps(payload, default=str) 43 except Exception as e: 44 logger.error(f"Failed to format data for file forwarder: {e}") 45 return ""
46 47
[docs] 48class FileForwarder(BaseForwarder): 49 """Forwards network events to files asynchronously. 50 51 Subscribes to specified topics and appends data to a file for each topic. 52 Files are stored in the configured output directory. It manages its own 53 background worker thread for non-blocking operation. 54 55 Security considerations: 56 - **Permissions**: Ensure the output directory has appropriate permissions 57 to prevent unauthorized access to the dumped network data. 58 - **Disk space**: Monitor disk space as files will grow indefinitely 59 since this forwarder appends data without rotation or cleanup. 60 - **Sensitive data**: Network event data may contain sensitive infrastructure 61 information (IPs, ports, protocol details). Ensure the storage medium 62 is secured. 63 """ 64
[docs] 65 def __init__(self, config: FileForwarderConfiguration): 66 """Initialize the FileForwarder. 67 68 Args: 69 config: A FileForwarderConfiguration instance. 70 """ 71 super().__init__(topics=config.topics) 72 self.config = config 73 self.formatter = FileFormatter() 74 self._ensure_output_dir()
75 76 def _ensure_output_dir(self): 77 """Ensure the output directory exists.""" 78 if not os.path.exists(self.config.output_dir): 79 os.makedirs(self.config.output_dir, exist_ok=True) 80 logger.info(f"Created output directory: {self.config.output_dir}") 81
[docs] 82 def forward(self, data: Any): 83 """Write formatted data to the appropriate topic file. 84 85 Args: 86 data: The data to write (Pydantic model instance). 87 """ 88 # Determine the topic of the data 89 topic = self._get_topic_for_data(data) 90 if not topic: 91 return 92 93 formatted_data = self.formatter.format(data) 94 if not formatted_data: 95 return 96 97 filename = f"{self.config.prefix}{topic.value}.json" 98 filepath = os.path.join(self.config.output_dir, filename) 99 100 try: 101 with open(filepath, "a") as f: 102 f.write(formatted_data + "\n") 103 except Exception as e: 104 logger.error(f"Failed to write data to {filepath}: {e}")
105 106 def _get_topic_for_data(self, data: Any) -> ProcessingTopic | None: 107 """Determine the topic for the given data instance.""" 108 from mongoose.models import NetworkAlert, NetworkFlow, NetworkDPI 109 110 # Check raw topics first for testing purposes if configured 111 if isinstance(data, NetworkAlert): 112 if ProcessingTopic.NETWORK_ALERT in self._resolve_topics(): 113 return ProcessingTopic.NETWORK_ALERT 114 return ProcessingTopic.ENRICHED_NETWORK_ALERT 115 if isinstance(data, NetworkFlow): 116 if ProcessingTopic.NETWORK_FLOW in self._resolve_topics(): 117 return ProcessingTopic.NETWORK_FLOW 118 return ProcessingTopic.ENRICHED_NETWORK_FLOW 119 if isinstance(data, NetworkDPI): 120 if ProcessingTopic.NETWORK_DPI in self._resolve_topics(): 121 return ProcessingTopic.NETWORK_DPI 122 return ProcessingTopic.ENRICHED_NETWORK_DPI 123 124 return None