1import json
2from importlib import resources
3from typing import Dict, Any, List, Optional, Set, Tuple
4
5from colander_data_converter.base.models import Entity
6
7resource_package = __name__
8
9
[docs]
10class Stix2MappingLoader:
11 """
12 Loads and provides access to the STIX2 to Colander mapping data.
13 """
14
[docs]
15 def __init__(self):
16 """
17 Initialize the mapping loader.
18 """
19 # Load the mapping data
20 self.mapping_data = self._load_mapping_data()
21
22 @staticmethod
23 def _load_mapping_data() -> Dict[str, Any]:
24 json_file = resources.files(resource_package).joinpath("data").joinpath("stix2_colander_mapping.json")
25 try:
26 with json_file.open() as f:
27 return json.load(f)
28 except (FileNotFoundError, json.JSONDecodeError) as e:
29 raise ValueError(f"Failed to load mapping data: {e}")
30
[docs]
31 def get_entity_type_mapping(self, entity_type: str) -> Dict[str, Any]:
32 """
33 Get the mapping data for a specific Colander entity type.
34
35 Args:
36 entity_type (str): The entity type (e.g., "actor", "device").
37
38 Returns:
39 Dict[str, Any]: The mapping data for the entity type.
40 """
41 _entity_type = entity_type.lower()
42 if _entity_type not in self.mapping_data:
43 return {}
44 return self.mapping_data[_entity_type]
45
[docs]
46 def get_entity_subtype_mapping(self, entity_type: str, entity_subtype: str) -> Dict[str, Any]:
47 """
48 Get the mapping data for a specific Colander entity type.
49
50 Args:
51 entity_type (str): The entity type (e.g., "actor", "device").
52 entity_subtype (str): The Colander entity subtype (e.g. "ipv4").
53
54 Returns:
55 Dict[str, Any]: The mapping data for the entity type.
56 """
57 _entity_type = entity_type.lower()
58 _entity_subtype = entity_subtype.lower()
59 if _entity_type not in self.mapping_data:
60 return {}
61 _entity_type_mapping = self.mapping_data[_entity_type]
62 if _entity_subtype not in _entity_type_mapping["types"]:
63 return {}
64 return _entity_type_mapping["types"][_entity_subtype]
65
[docs]
66 def get_stix2_type_for_entity(self, entity: Entity) -> str:
67 _entity_mapping = self.get_entity_subtype_mapping(
68 entity.get_super_type().short_name, entity.get_type().short_name
69 )
70 return _entity_mapping.get("stix2_type", "")
71
[docs]
72 def get_supported_colander_types(self) -> List[str]:
73 return self.mapping_data.get("supported_colander_types", [])
74
[docs]
75 def get_supported_stix2_types(self) -> List[str]:
76 _types: Set[str] = set()
77 for _supported_colander_type in self.get_supported_colander_types():
78 _type_mapping = self.mapping_data.get(_supported_colander_type, {})
79 for _subtype_name, _mapping in _type_mapping.get("types", {}).items():
80 _types.add(_mapping.get("stix2_type", ""))
81 return list(_types)
82
[docs]
83 def get_entity_type_for_stix2(self, stix2_type: str) -> Tuple[Optional[str], Optional[List[str]]]:
84 """
85 Get the Colander entity type for a STIX2 type (e.g. "indicator", "threat-actor").
86
87 Args:
88 stix2_type (str): The STIX2 type.
89
90 Returns:
91 Tuple[Optional[str], Optional[List[str]]]: The corresponding Colander type and the list of
92 subtype candidates, or None if not found.
93 """
94 if stix2_type not in self.get_supported_stix2_types():
95 return None, None
96
97 # Create mapping between STIX2 and Colander types (e.g. "treat-actor" -> "actor")
98 _stix2_type_mapping: Dict[str, str] = {}
99 for _supported_colander_type in self.get_supported_colander_types():
100 for _supported_colander_subtype, _mapping in self.mapping_data[_supported_colander_type]["types"].items():
101 _stix2_type_mapping[_mapping["stix2_type"]] = _supported_colander_type
102 if stix2_type not in _stix2_type_mapping:
103 return None, None
104
105 _colander_type_name = _stix2_type_mapping[stix2_type] # e.g. observable
106 _colander_type_mapping = self.get_entity_type_mapping(_colander_type_name)
107
108 # Iterate over Colander subtypes(e.g. ipv4, domain)
109 _subtype_candidates: Set[str] = set()
110 for _colander_subtype_name, _mapping in _colander_type_mapping.get("types", {}).items():
111 # List subtype candidates
112 if "stix2_type" in _mapping and _mapping["stix2_type"] == stix2_type:
113 _subtype_candidates.add(_colander_subtype_name)
114
115 # If not candidates, append the "generic" subtype
116 if len(_subtype_candidates) == 0:
117 _subtype_candidates.add("generic")
118
119 return _colander_type_name, list(_subtype_candidates)
120
[docs]
121 def get_stix2_to_colander_field_mapping(self, entity_type: str) -> Dict[str, str]:
122 """
123 Get the field mapping from STIX2 to Colander for a specific entity type.
124
125 Args:
126 entity_type (str): The entity type.
127
128 Returns:
129 Dict[str, str]: The field mapping from STIX2 to Colander.
130 """
131 entity_mapping = self.get_entity_type_mapping(entity_type)
132 return entity_mapping.get("stix2_to_colander", {})
133
[docs]
134 def get_colander_to_stix2_field_mapping(self, entity_type: str) -> Dict[str, str]:
135 entity_mapping = self.get_entity_type_mapping(entity_type)
136 return entity_mapping.get("colander_to_stix2", {})
137
[docs]
138 def get_field_relationship_mapping(self) -> Dict[str, str]:
139 return self.mapping_data.get("field_relationship_map", {})
140
[docs]
141 def get_observable_mapping(self, observable_type: str) -> Dict[str, Any]:
142 return self.get_entity_subtype_mapping("observable", observable_type)
143
[docs]
144 def get_observable_pattern(self, observable_type: str) -> str:
145 mapping = self.get_observable_mapping(observable_type)
146 if mapping:
147 return mapping["pattern"]
148 return "[unknown:value = '{value}']"
149
[docs]
150 def get_threat_mapping(self, threat_type: str) -> Dict[str, Any]:
151 return self.get_entity_subtype_mapping("threat", threat_type)
152
[docs]
153 def get_malware_types_for_threat(self, threat_type: str) -> List[str]:
154 threat_mapping = self.get_threat_mapping(threat_type)
155 return threat_mapping.get("malware_types", [])
156
[docs]
157 def get_actor_mapping(self, actor_type: str) -> Dict[str, Any]:
158 return self.get_entity_subtype_mapping("actor", actor_type)
159
[docs]
160 def get_device_mapping(self, device_type: str) -> Dict[str, Any]:
161 return self.get_entity_subtype_mapping("device", device_type)
162