1import json
2import logging
3import os
4from typing import Any
5
6from mongoose.core.processing import ProcessingTopic
7from mongoose.forward.base import BaseForwarder, BaseFormatter
8from mongoose.models.configuration import FileForwarderConfiguration
9
10logger = logging.getLogger(__name__)
11
12
41
42
[docs]
43class FileForwarder(BaseForwarder):
44 """Forwards network events to files asynchronously.
45
46 Subscribes to specified topics and appends data to a file for each topic.
47 Files are stored in the configured output directory. It manages its own
48 background worker thread for non-blocking operation.
49
50 Security considerations:
51 - **Permissions**: Ensure the output directory has appropriate permissions
52 to prevent unauthorized access to the dumped network data.
53 - **Disk space**: Monitor disk space as files will grow indefinitely
54 since this forwarder appends data without rotation or cleanup.
55 - **Sensitive data**: Network event data may contain sensitive infrastructure
56 information (IPs, ports, protocol details). Ensure the storage medium
57 is secured.
58 """
59
[docs]
60 def __init__(self, config: FileForwarderConfiguration):
61 """Initialize the FileForwarder.
62
63 Args:
64 config: A FileForwarderConfiguration instance.
65 """
66 super().__init__(topics=config.topics)
67 self.config = config
68 self.formatter = FileFormatter()
69 self._ensure_output_dir()
70
71 def _ensure_output_dir(self):
72 """Ensure the output directory exists."""
73 if not os.path.exists(self.config.output_dir):
74 os.makedirs(self.config.output_dir, exist_ok=True)
75 logger.info(f"Created output directory: {self.config.output_dir}")
76
[docs]
77 def forward(self, data: Any):
78 """Write formatted data to the appropriate topic file.
79
80 Args:
81 data: The data to write (Pydantic model instance).
82 """
83 # Determine the topic of the data
84 topic = self._get_topic_for_data(data)
85 if not topic:
86 return
87
88 formatted_data = self.formatter.format(data)
89 if not formatted_data:
90 return
91
92 filename = f"{self.config.prefix}{topic.value}.json"
93 filepath = os.path.join(self.config.output_dir, filename)
94
95 try:
96 with open(filepath, "a") as f:
97 f.write(formatted_data + "\n")
98 except Exception as e:
99 logger.error(f"Failed to write data to {filepath}: {e}")
100
101 def _get_topic_for_data(self, data: Any) -> ProcessingTopic | None:
102 """Determine the topic for the given data instance."""
103 from mongoose.models import NetworkAlert, NetworkFlow, NetworkDPI
104
105 # Check raw topics first for testing purposes if configured
106 if isinstance(data, NetworkAlert):
107 if ProcessingTopic.NETWORK_ALERT in self._resolve_topics():
108 return ProcessingTopic.NETWORK_ALERT
109 return ProcessingTopic.ENRICHED_NETWORK_ALERT
110 if isinstance(data, NetworkFlow):
111 if ProcessingTopic.NETWORK_FLOW in self._resolve_topics():
112 return ProcessingTopic.NETWORK_FLOW
113 return ProcessingTopic.ENRICHED_NETWORK_FLOW
114 if isinstance(data, NetworkDPI):
115 if ProcessingTopic.NETWORK_DPI in self._resolve_topics():
116 return ProcessingTopic.NETWORK_DPI
117 return ProcessingTopic.ENRICHED_NETWORK_DPI
118
119 return None