Source code for mongoose.forward.file

  1import json
  2import logging
  3import os
  4from typing import Any
  5
  6from mongoose.core.processing import ProcessingTopic
  7from mongoose.forward.base import BaseForwarder, BaseFormatter
  8from mongoose.models.configuration import FileForwarderConfiguration
  9
 10logger = logging.getLogger(__name__)
 11
 12
[docs] 13class FileFormatter(BaseFormatter): 14 """Formats network data for file storage. 15 16 This class handles the conversion of network models (DPI, Alert, Flow) into 17 JSON-serializable strings. It ensures that complex types like datetime 18 objects are correctly converted to strings and that any Pydantic-specific 19 fields are handled according to the Pydantic version in use. 20 """ 21
[docs] 22 @staticmethod 23 def format(data: Any) -> str: 24 """Format the given data into a JSON string. 25 26 Args: 27 data: The model instance or data object to format. 28 29 Returns: 30 A JSON string representation of the data followed by a newline. 31 Returns an empty string if formatting fails. 32 """ 33 try: 34 payload = BaseFormatter.to_dict(data) 35 36 # Ensure complex types are handled 37 return json.dumps(payload, default=str) 38 except Exception as e: 39 logger.error(f"Failed to format data for file forwarder: {e}") 40 return ""
41 42
[docs] 43class FileForwarder(BaseForwarder): 44 """Forwards network events to files asynchronously. 45 46 Subscribes to specified topics and appends data to a file for each topic. 47 Files are stored in the configured output directory. It manages its own 48 background worker thread for non-blocking operation. 49 50 Security considerations: 51 - **Permissions**: Ensure the output directory has appropriate permissions 52 to prevent unauthorized access to the dumped network data. 53 - **Disk space**: Monitor disk space as files will grow indefinitely 54 since this forwarder appends data without rotation or cleanup. 55 - **Sensitive data**: Network event data may contain sensitive infrastructure 56 information (IPs, ports, protocol details). Ensure the storage medium 57 is secured. 58 """ 59
[docs] 60 def __init__(self, config: FileForwarderConfiguration): 61 """Initialize the FileForwarder. 62 63 Args: 64 config: A FileForwarderConfiguration instance. 65 """ 66 super().__init__(topics=config.topics) 67 self.config = config 68 self.formatter = FileFormatter() 69 self._ensure_output_dir()
70 71 def _ensure_output_dir(self): 72 """Ensure the output directory exists.""" 73 if not os.path.exists(self.config.output_dir): 74 os.makedirs(self.config.output_dir, exist_ok=True) 75 logger.info(f"Created output directory: {self.config.output_dir}") 76
[docs] 77 def forward(self, data: Any): 78 """Write formatted data to the appropriate topic file. 79 80 Args: 81 data: The data to write (Pydantic model instance). 82 """ 83 # Determine the topic of the data 84 topic = self._get_topic_for_data(data) 85 if not topic: 86 return 87 88 formatted_data = self.formatter.format(data) 89 if not formatted_data: 90 return 91 92 filename = f"{self.config.prefix}{topic.value}.json" 93 filepath = os.path.join(self.config.output_dir, filename) 94 95 try: 96 with open(filepath, "a") as f: 97 f.write(formatted_data + "\n") 98 except Exception as e: 99 logger.error(f"Failed to write data to {filepath}: {e}")
100 101 def _get_topic_for_data(self, data: Any) -> ProcessingTopic | None: 102 """Determine the topic for the given data instance.""" 103 from mongoose.models import NetworkAlert, NetworkFlow, NetworkDPI 104 105 # Check raw topics first for testing purposes if configured 106 if isinstance(data, NetworkAlert): 107 if ProcessingTopic.NETWORK_ALERT in self._resolve_topics(): 108 return ProcessingTopic.NETWORK_ALERT 109 return ProcessingTopic.ENRICHED_NETWORK_ALERT 110 if isinstance(data, NetworkFlow): 111 if ProcessingTopic.NETWORK_FLOW in self._resolve_topics(): 112 return ProcessingTopic.NETWORK_FLOW 113 return ProcessingTopic.ENRICHED_NETWORK_FLOW 114 if isinstance(data, NetworkDPI): 115 if ProcessingTopic.NETWORK_DPI in self._resolve_topics(): 116 return ProcessingTopic.NETWORK_DPI 117 return ProcessingTopic.ENRICHED_NETWORK_DPI 118 119 return None