1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
2# SPDX-FileContributor: u039b <git@0x39b.fr>
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5
6import json
7import logging
8import os
9from typing import Any
10
11from mongoose.core.processing import ProcessingTopic
12from mongoose.forward.base import BaseForwarder, BaseFormatter
13from mongoose.models.configuration import FileForwarderConfiguration
14
15logger = logging.getLogger(__name__)
16
17
46
47
[docs]
48class FileForwarder(BaseForwarder):
49 """Forwards network events to files asynchronously.
50
51 Subscribes to specified topics and appends data to a file for each topic.
52 Files are stored in the configured output directory. It manages its own
53 background worker thread for non-blocking operation.
54
55 Security considerations:
56 - **Permissions**: Ensure the output directory has appropriate permissions
57 to prevent unauthorized access to the dumped network data.
58 - **Disk space**: Monitor disk space as files will grow indefinitely
59 since this forwarder appends data without rotation or cleanup.
60 - **Sensitive data**: Network event data may contain sensitive infrastructure
61 information (IPs, ports, protocol details). Ensure the storage medium
62 is secured.
63 """
64
[docs]
65 def __init__(self, config: FileForwarderConfiguration):
66 """Initialize the FileForwarder.
67
68 Args:
69 config: A FileForwarderConfiguration instance.
70 """
71 super().__init__(topics=config.topics)
72 self.config = config
73 self.formatter = FileFormatter()
74 self._ensure_output_dir()
75
76 def _ensure_output_dir(self):
77 """Ensure the output directory exists."""
78 if not os.path.exists(self.config.output_dir):
79 os.makedirs(self.config.output_dir, exist_ok=True)
80 logger.info(f"Created output directory: {self.config.output_dir}")
81
[docs]
82 def forward(self, data: Any):
83 """Write formatted data to the appropriate topic file.
84
85 Args:
86 data: The data to write (Pydantic model instance).
87 """
88 # Determine the topic of the data
89 topic = self._get_topic_for_data(data)
90 if not topic:
91 return
92
93 formatted_data = self.formatter.format(data)
94 if not formatted_data:
95 return
96
97 filename = f"{self.config.prefix}{topic.value}.json"
98 filepath = os.path.join(self.config.output_dir, filename)
99
100 try:
101 with open(filepath, "a") as f:
102 f.write(formatted_data + "\n")
103 except Exception as e:
104 logger.error(f"Failed to write data to {filepath}: {e}")
105
106 def _get_topic_for_data(self, data: Any) -> ProcessingTopic | None:
107 """Determine the topic for the given data instance."""
108 from mongoose.models import NetworkAlert, NetworkFlow, NetworkDPI
109
110 # Check raw topics first for testing purposes if configured
111 if isinstance(data, NetworkAlert):
112 if ProcessingTopic.NETWORK_ALERT in self._resolve_topics():
113 return ProcessingTopic.NETWORK_ALERT
114 return ProcessingTopic.ENRICHED_NETWORK_ALERT
115 if isinstance(data, NetworkFlow):
116 if ProcessingTopic.NETWORK_FLOW in self._resolve_topics():
117 return ProcessingTopic.NETWORK_FLOW
118 return ProcessingTopic.ENRICHED_NETWORK_FLOW
119 if isinstance(data, NetworkDPI):
120 if ProcessingTopic.NETWORK_DPI in self._resolve_topics():
121 return ProcessingTopic.NETWORK_DPI
122 return ProcessingTopic.ENRICHED_NETWORK_DPI
123
124 return None