1import json
2from importlib import resources
3from typing import Optional, Dict, Any, Type, List, Tuple, Union
4
5from pydantic import BaseModel, ConfigDict
6from pymisp import MISPObject, AbstractMISP, MISPAttribute, MISPTag
7
8from colander_data_converter.base.models import CommonEntitySuperTypes, CommonEntitySuperType
9from colander_data_converter.base.types.base import CommonEntityType
10from colander_data_converter.converters.misp.utils import get_attribute_by_name
11
12type ColanderMISPMapping = Dict[str, Any]
13type RelationMapping = Dict[str, Any]
14
15
18
19
[docs]
20class Discriminator(BaseModel):
21 type: str
22 property: str
23 target: str
24 value: Optional[str] = None
25
26
[docs]
27class MISPColanderMapping(BaseModel):
28 also: Optional[List[str]] = None
29 discriminator: Optional[Discriminator] = None
30
31
[docs]
32class EntityTypeMapping(BaseModel):
33 colander_type: str
34 misp_object: str
35 misp_type: Optional[str] = None
36 misp_definition: Optional[str] = None
37 misp_colander_mapping: MISPColanderMapping
38 colander_misp_mapping: ColanderMISPMapping
39 colander_super_type: Optional[CommonEntitySuperType] = None
40
41 @property
42 def colander_entity_type(self) -> CommonEntityType:
43 return self.colander_super_type.type_by_short_name(self.colander_type)
44
[docs]
45 def get_misp_model_class(self) -> Tuple[Type[AbstractMISP], str]:
46 if self.misp_object == "misp-attribute":
47 return MISPAttribute, self.misp_type
48 elif self.misp_object == "misp-tag":
49 return MISPTag, self.misp_type
50 return MISPObject, self.misp_object
51
[docs]
52 def get_colander_misp_field_mapping(self) -> List[Optional[Tuple[str, str]]]:
53 return [(src, dst) for src, dst in self.colander_misp_mapping.items() if isinstance(dst, str)]
54
[docs]
55 def get_colander_misp_literals_mapping(self) -> List[Optional[Tuple[str, str]]]:
56 return [(src, dst) for src, dst in self.colander_misp_mapping.get("literals", {}).items()]
57
[docs]
58 def get_colander_misp_attributes_mapping(self) -> List[Optional[Tuple[str, str]]]:
59 return [
60 (src, dst)
61 for src, dst in self.colander_misp_mapping.get("misp_attributes", {}).items()
62 if isinstance(dst, str)
63 ]
64
65
[docs]
66class EntitySuperTypeMapping(BaseModel):
67 model_config = ConfigDict(
68 str_strip_whitespace=True,
69 arbitrary_types_allowed=True,
70 )
71 colander_super_type: str
72 model_class: Any
73 types_mapping: Dict[str, EntityTypeMapping] = {}
74
[docs]
75 def get_supported_colander_types(self) -> List[Optional[str]]:
76 return list(self.types_mapping.keys())
77
78
[docs]
79class Mapping(object):
80 TYPES = [
81 (CommonEntitySuperTypes.ACTOR.value, "actor"),
82 (CommonEntitySuperTypes.ARTIFACT.value, "artifact"),
83 (CommonEntitySuperTypes.DEVICE.value, "device"),
84 (CommonEntitySuperTypes.EVENT.value, "event"),
85 (CommonEntitySuperTypes.DATA_FRAGMENT.value, "data_fragment"),
86 (CommonEntitySuperTypes.DETECTION_RULE.value, "detection_rule"),
87 (CommonEntitySuperTypes.OBSERVABLE.value, "observable"),
88 (CommonEntitySuperTypes.THREAT.value, "threat"),
89 ]
90
91 def __init__(self):
92 self.misp_objects_mapping: Dict[str, List[EntityTypeMapping]] = {}
93 self.misp_attributes_mapping: Dict[str, List[EntityTypeMapping]] = {}
94 self.misp_tags_mapping: Dict[str, EntityTypeMapping] = {}
95 self.colander_super_types_mapping: Dict[str, EntitySuperTypeMapping] = {}
96 self.colander_relation_mapping = self._load_relation_mapping_definition()
97 for type_class, prefix in self.TYPES:
98 self.colander_super_types_mapping[type_class.short_name] = self._load_mapping_definition(type_class, prefix)
99 self._build_misp_mapping()
100
101 @staticmethod
102 def _load_relation_mapping_definition() -> RelationMapping:
103 resource_package = __name__
104 json_file = resources.files(resource_package).joinpath("data").joinpath("relation_misp_mapping.json")
105 with json_file.open() as f:
106 return json.load(f)
107
108 @staticmethod
109 def _load_mapping_definition(type_class: CommonEntitySuperType, filename_prefix: str) -> EntitySuperTypeMapping:
110 resource_package = __name__
111 json_file = resources.files(resource_package).joinpath("data").joinpath(f"{filename_prefix}_misp_mapping.json")
112 super_type_mapping = EntitySuperTypeMapping(colander_super_type=type_class.short_name, model_class=type_class)
113 with json_file.open() as f:
114 raw = json.load(f)
115 for definition in raw:
116 type_mapping = EntityTypeMapping.model_validate(definition)
117 type_mapping.colander_super_type = type_class
118 super_type_mapping.types_mapping[type_mapping.colander_type] = type_mapping
119 return super_type_mapping
120
121 def _build_misp_mapping(self):
122 for _, super_type_mapping in self.colander_super_types_mapping.items():
123 for _, type_mapping in super_type_mapping.types_mapping.items():
124 if type_mapping.misp_object == "misp-attribute":
125 if type_mapping.misp_type not in self.misp_attributes_mapping:
126 self.misp_attributes_mapping[type_mapping.misp_type] = []
127 for s in type_mapping.misp_colander_mapping.also or []:
128 self.misp_attributes_mapping[s] = [type_mapping]
129 self.misp_attributes_mapping[type_mapping.misp_type].append(type_mapping)
130 if type_mapping.misp_object == "misp-tag":
131 tag_name = type_mapping.colander_misp_mapping["literals"]["name"].replace("{value}", "")
132 self.misp_tags_mapping[tag_name] = type_mapping
133 else:
134 if type_mapping.misp_object not in self.misp_objects_mapping:
135 self.misp_objects_mapping[type_mapping.misp_object] = []
136 self.misp_objects_mapping[type_mapping.misp_object].append(type_mapping)
137
[docs]
138 def get_relation_mapping_to_misp(
139 self, super_type: CommonEntitySuperType, reference_name: str
140 ) -> Optional[RelationMapping]:
141 mapping = self.colander_relation_mapping.get(super_type.short_name, {})
142 return mapping.get(reference_name, None)
143
[docs]
144 def get_mapping_to_misp(
145 self, entity_super_type: CommonEntitySuperType, entity_type: CommonEntityType
146 ) -> Optional[EntityTypeMapping]:
147 est_mapping = self.colander_super_types_mapping.get(entity_super_type.short_name, None)
148 if est_mapping:
149 return est_mapping.types_mapping.get(entity_type.short_name, None)
150 return None
151
[docs]
152 def get_misp_object_or_attribute_value(self) -> Optional[str]:
153 return None
154
[docs]
155 def match_discriminator(self, misp_object: Union[MISPObject, MISPAttribute], discriminator: Discriminator) -> bool:
156 matched = False
157 if not discriminator or not misp_object:
158 return matched
159
160 target_type = discriminator.target
161 if target_type == "self" and not isinstance(misp_object, MISPAttribute):
162 raise Exception("Discriminator target of type 'self' is only supported for MISPAttribute")
163 if target_type == "attribute-value" and not isinstance(misp_object, MISPObject):
164 raise Exception("Discriminator target of type 'attribute-value' is only supported for MISPObject")
165
166 value = None
167 if target_type == "self":
168 value = getattr(misp_object, discriminator.property)
169 elif target_type == "attribute-value":
170 attribute = get_attribute_by_name(misp_object, discriminator.property)
171 if attribute:
172 value = attribute.value
173
174 if discriminator.type == "match":
175 matched = value == discriminator.value
176
177 return matched
178
[docs]
179 def get_misp_object_mapping(self, misp_object: MISPObject) -> Optional[EntityTypeMapping]:
180 if not misp_object:
181 return None
182 candidates = self.misp_objects_mapping.get(misp_object.name, [])
183 if len(candidates) == 1:
184 return candidates[0]
185 for candidate in candidates:
186 types_class = candidate.colander_super_type.types_class
187 if (discriminator := candidate.misp_colander_mapping.discriminator) is None:
188 continue
189 if discriminator.type == "suggest":
190 value = None
191 if discriminator.target == "self":
192 value = getattr(misp_object, discriminator.property)
193 elif discriminator.target == "attribute-value":
194 if (attribute := get_attribute_by_name(misp_object, discriminator.property)) is None:
195 return None
196 value = attribute.value
197 if not value:
198 return None
199 if (suggested_type := types_class.suggest(value)) is None:
200 return None
201 mapping = self.get_mapping_to_misp(candidate.colander_super_type, suggested_type)
202 return mapping
203 elif self.match_discriminator(misp_object, discriminator):
204 return candidate
205 return None
206
[docs]
207 def get_misp_attribute_mapping(self, misp_attribute: MISPAttribute) -> Optional[EntityTypeMapping]:
208 if not misp_attribute:
209 return None
210 candidates = self.misp_attributes_mapping.get(misp_attribute.type, [])
211 if len(candidates) == 1:
212 return candidates[0]
213 for candidate in candidates:
214 types_class = candidate.colander_super_type.types_class
215 if (discriminator := candidate.misp_colander_mapping.discriminator) is None:
216 continue
217 if discriminator.type == "suggest":
218 if (suggested_type := types_class.suggest(getattr(misp_attribute, discriminator.property))) is None:
219 return None
220 mapping = self.get_mapping_to_misp(candidate.colander_super_type, suggested_type)
221 return mapping
222 elif self.match_discriminator(misp_attribute, discriminator):
223 return candidate
224 return None
225
[docs]
226 def get_misp_tag_mapping(self, misp_tag: MISPTag) -> Optional[EntityTypeMapping]:
227 if not misp_tag:
228 return None
229 for tag_name, mapping in self.misp_tags_mapping.items():
230 if tag_name in misp_tag.name:
231 return mapping
232 return None