1from typing import Optional, Union, List, Tuple
2
3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent, MISPFeed
4
5from colander_data_converter.base.common import TlpPapLevel
6from colander_data_converter.base.models import (
7 EntityTypes,
8 Case,
9 ColanderFeed,
10 EntityRelation,
11 ColanderRepository,
12 Entity,
13)
14from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
15from colander_data_converter.converters.misp.utils import get_attribute_by_name
16from colander_data_converter.converters.stix2.utils import get_nested_value
17
18
[docs]
19class MISPMapper:
20 """
21 Base mapper class for MISP conversions.
22
23 Provides common functionality for mapping Colander data structures to MISP objects.
24 """
25
26 def __init__(self):
27 self.mapping = Mapping()
28 ColanderRepository().clear()
29
[docs]
30 @staticmethod
31 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag:
32 """
33 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag.
34
35 Args:
36 tlp_level: The TLP level to convert
37
38 Returns:
39 A MISP tag object with the TLP level name
40 """
41 t = MISPTag()
42 t.name = tlp_level.name
43 return t
44
45
[docs]
46class ColanderToMISPMapper(MISPMapper):
47 """
48 Mapper class for converting Colander objects to MISP format.
49
50 Handles the conversion of various Colander entity types (threats, actors, events,
51 artifacts, etc.) to their corresponding MISP object representations using
52 predefined mapping configurations.
53 """
54
[docs]
55 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]:
56 """
57 Convert a Colander object to its corresponding MISP representation.
58
59 This method performs the core conversion logic by:
60
61 1. Looking up the appropriate mapping for the Colander object type
62 2. Creating the corresponding MISP object (Attribute or Object)
63 3. Mapping fields, literals, and attributes from Colander to MISP format
64
65 Args:
66 colander_object: The Colander object to convert
67
68 Returns:
69 The converted MISP object, or None if no mapping exists
70 """
71 # Get the mapping configuration for this Colander object type
72 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping_to_misp(
73 colander_object.get_super_type(), colander_object.type
74 )
75
76 if entity_type_mapping is None:
77 return None
78
79 # Determine the MISP model class and type to create
80 misp_model, misp_type = entity_type_mapping.get_misp_model_class()
81
82 # Create the appropriate MISP object based on the model type
83 if issubclass(misp_model, MISPAttribute):
84 misp_object: MISPAttribute = misp_model(strict=True)
85 misp_object.type = misp_type
86 elif issubclass(misp_model, MISPObject):
87 misp_object: MISPObject = misp_model(name=misp_type, strict=True)
88 elif issubclass(misp_model, MISPTag):
89 tag_pattern = entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name")
90 return TagStub(tag_pattern.format(value=colander_object.name))
91 else:
92 return None
93
94 # Set the "to_ids" attribute when the object is flagged as malicious
95 if hasattr(colander_object, "attributes"):
96 if colander_object.attributes and colander_object.attributes.get("is_malicious", False):
97 misp_object.to_ids = True
98 if hasattr(colander_object, "associated_threat") and colander_object.associated_threat is not None:
99 misp_object.to_ids = True
100
101 # Set common MISP object properties
102 # ToDo: add tag for TLP
103 misp_object.uuid = str(colander_object.id)
104 misp_object.first_seen = colander_object.created_at
105 misp_object.last_seen = colander_object.updated_at
106
107 # Convert Colander object to dictionary for nested field access
108 colander_object_dict = colander_object.model_dump(mode="json")
109
110 # Map direct field mappings from Colander to MISP object properties
111 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping():
112 value = getattr(colander_object, source_field, None)
113 if value is not None:
114 setattr(misp_object, target_field, value)
115
116 # Set constant/literal values on the MISP object
117 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping():
118 if target_field in ["category", "comment"]:
119 setattr(misp_object, target_field, value)
120 else:
121 misp_object.add_attribute(target_field, value=value)
122
123 # Map Colander fields to MISP object attributes
124 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping():
125 if "." in source_field:
126 # Handle nested field access using dot notation
127 value = get_nested_value(colander_object_dict, source_field)
128 if value is not None:
129 misp_object.add_attribute(target_field, value=value)
130 else:
131 # Handle direct field access
132 value = getattr(colander_object, source_field, None)
133 if value is not None:
134 misp_object.add_attribute(target_field, value=value)
135
136 return misp_object
137
[docs]
138 @staticmethod
139 def get_element_from_event(
140 event: MISPEvent, uuid: str, types: List[str]
141 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]:
142 """
143 Retrieve an element (object or attribute) from a MISP event by UUID and type.
144
145 Args:
146 event: The MISP event to search within.
147 uuid: The UUID of the element to find.
148 types: List of types to search for ("object", "attribute").
149
150 Returns:
151 The found element and its type as a string ("Object" or "Attribute"), or (None, None) if not found.
152 """
153 if "object" in types:
154 for obj in event.objects:
155 if hasattr(obj, "uuid") and obj.uuid == uuid:
156 return obj, "Object"
157 if "attribute" in types:
158 for obj in event.attributes:
159 if hasattr(obj, "uuid") and obj.uuid == uuid:
160 return obj, "Attribute"
161 return None, None
162
[docs]
163 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes):
164 """
165 Create relationships in a MISP event based on the Colander object's immutable relations.
166
167 This method processes each immutable relation defined in the Colander object, determines the appropriate
168 mapping and direction, and adds the corresponding relationship or tag to the MISP event.
169
170 Args:
171 event: The MISP event to which relationships or tags will be added.
172 colander_object: The Colander object containing immutable relations.
173
174 Note:
175 - If the relation mapping specifies 'use_tag', a tag is added to the relevant MISP attribute.
176 - Otherwise, a relationship is created between MISP objects or attributes as defined by the mapping.
177 """
178 super_type = colander_object.super_type
179 # Create relationships based on immutable relations
180 for _, relation in colander_object.get_immutable_relations().items():
181 reference_name = relation.name
182 relation_mapping = self.mapping.get_relation_mapping_to_misp(super_type, reference_name)
183
184 if not relation_mapping:
185 continue
186
187 reverse = relation_mapping.get("reverse", False)
188 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id)
189 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id)
190 relation_name = relation_mapping.get("name", reference_name.replace("_", "-"))
191
192 # Tags only on MISPAttribute or MISPEvent
193 if relation_mapping.get("use_tag", False):
194 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"])
195 if reverse:
196 tag = self.convert_colander_object(relation.obj_from)
197 else:
198 tag = self.convert_colander_object(relation.obj_to)
199 if source_object and isinstance(tag, TagStub):
200 event.add_attribute_tag(tag, source_id)
201 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute
202 else:
203 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
204 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
205 if source_object and target_object:
206 source_object.add_relationship(type_name, target_id, relation_name)
207
[docs]
208 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]):
209 """
210 Create relationships in a MISP event based on a list of Colander relations.
211
212 This method finds the corresponding MISP objects or attributes for each relation and
213 adds the relationship to the source object.
214
215 Args:
216 event: The MISP event to which relationships will be added.
217 colander_relations: List of Colander relations to convert.
218 """
219 for relation in colander_relations:
220 source_id = str(relation.obj_from.id)
221 target_id = str(relation.obj_to.id)
222 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
223 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
224 if source_object and target_object:
225 source_object.add_relationship(type_name, target_id, relation.name)
226
[docs]
227 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]:
228 """
229 Convert a Colander Case and its associated ColanderFeed into a MISPEvent.
230
231 This method performs the following steps:
232
233 1. Initializes a new MISPEvent using the case information.
234 2. Iterates over all entities in the feed, converting each to a MISP object or attribute.
235
236 - Entities that cannot be converted are added to the skipped list.
237 - MISPAttributes are added as attributes to the event.
238 - MISPObjects are added as objects to the event.
239 3. Processes immutable relations for each entity, adding corresponding relationships or tags to the event.
240 4. Processes regular (non-immutable) relations for each entity, adding relationships to the event.
241 5. Returns the constructed MISPEvent and a list of skipped entities.
242
243 Args:
244 case: The Colander case to convert.
245 feed: The feed containing entities and relations to convert.
246
247 Returns:
248 The resulting MISPEvent and a list of entities that were skipped during conversion.
249 """
250 skipped = []
251 misp_event = MISPEvent()
252 misp_event.uuid = str(case.id)
253 misp_event.info = case.description
254 misp_event.date = case.created_at
255 for entity in feed.entities.values():
256 if entity.case != case:
257 continue
258 misp_object = self.convert_colander_object(entity)
259 if not misp_object:
260 skipped.append(entity)
261 continue
262 if isinstance(misp_object, MISPAttribute):
263 misp_event.add_attribute(**misp_object.to_dict())
264 elif isinstance(misp_object, MISPObject):
265 misp_event.add_object(misp_object)
266
267 # Immutable relations
268 for entity in feed.entities.values():
269 self.convert_immutable_relations(misp_event, entity)
270
271 # Regular relations
272 for entity in feed.entities.values():
273 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values()))
274
275 return misp_event, skipped
276
277
[docs]
278class MISPToColanderMapper(MISPMapper):
[docs]
279 def convert_misp_event(self, event: MISPEvent) -> Tuple[Case, ColanderFeed]:
280 """
281 Convert a MISPEvent into a Colander case and feed.
282
283 This method performs the following steps:
284
285 1. Creates a new Case instance using the event information.
286 2. Initializes a ColanderFeed and adds the case to it.
287 3. Converts all MISP objects in the event to Colander entities and adds them to the feed.
288 4. Converts all MISP attributes in the event to Colander entities and adds them to the feed.
289 5. Converts all relations in the event to Colander relations and adds them to the feed.
290 6. Returns the constructed Case and ColanderFeed.
291
292 Args:
293 event: The MISP event to convert.
294
295 Returns:
296 The resulting Case and Feed.
297 """
298 case = Case(id=event.uuid, name=event.info, description=f"Loaded from MISP event [{event.uuid}]")
299 feed = ColanderFeed(cases={f"{case.id}": case})
300 for entity in self.convert_objects(event):
301 entity.case = case
302 feed.entities[str(entity.id)] = entity
303 for entity in self.convert_attributes(event):
304 entity.case = case
305 feed.entities[str(entity.id)] = entity
306 for relation in self.convert_relations(event):
307 relation.case = case
308 feed.relations[str(relation.id)] = relation
309 return case, feed
310
[docs]
311 def convert_relations(self, event: MISPEvent) -> List[EntityRelation]:
312 relations = []
313 for misp_object in event.objects + event.attributes:
314 source_object = ColanderRepository() >> misp_object.uuid
315 if not isinstance(source_object, Entity):
316 continue
317 for misp_relation in misp_object.relationships or []:
318 relation_name = misp_relation.relationship_type
319 target_object = ColanderRepository() >> misp_relation.related_object_uuid
320 if not isinstance(target_object, Entity):
321 continue
322 if relation_name:
323 relations.append(
324 EntityRelation(
325 id=misp_relation.uuid, name=relation_name, obj_from=source_object, obj_to=target_object
326 )
327 )
328 return relations
329
330 def _prepare_colander_entity(
331 self, misp_object: Union[MISPObject, MISPAttribute], entity_mapping: EntityTypeMapping, entity_name: str
332 ) -> Optional[EntityTypes]:
333 """
334 Prepare and populate a Colander entity from a MISP object using the provided mapping and entity name.
335
336 Args:
337 misp_object: The MISP object or attribute to convert.
338 entity_mapping: The mapping configuration for the entity type.
339 entity_name: The name to assign to the Colander entity.
340
341 Returns:
342 The populated Colander entity, or None if creation fails.
343 """
344 # Get the Colander model class and entity type from the mapping
345 colander_model_class = entity_mapping.colander_super_type.model_class
346 colander_entity_type = entity_mapping.colander_entity_type
347
348 # Instantiate the Colander entity with id, type, and name
349 colander_entity = colander_model_class(id=misp_object.uuid, type=colander_entity_type, name=entity_name)
350
351 # Map MISP object properties to Colander entity attributes based on the mapping
352 for colander_attribute_name, misp_property_name in entity_mapping.colander_misp_mapping.items():
353 # Skip mapping for literals, name, and misp_attributes keys
354 if colander_attribute_name in ["literals", "name", "misp_attributes"]:
355 continue
356 misp_value = getattr(misp_object, misp_property_name, None)
357 setattr(colander_entity, colander_attribute_name, misp_value)
358
359 return colander_entity
360
[docs]
361 def convert_object(self, misp_object: MISPObject) -> Optional[EntityTypes]:
362 """
363 Convert a MISPObject to its corresponding Colander entity.
364
365 This method uses the mapping configuration to extract the entity name and attributes
366 from the MISPObject, then creates and populates a Colander entity instance.
367
368 Args:
369 misp_object: The MISP object to convert.
370
371 Returns:
372 The resulting Colander entity, or None if mapping or name is missing.
373 """
374 # Get the mapping for this MISP object
375 entity_mapping = self.mapping.get_misp_object_mapping(misp_object)
376 if not entity_mapping or not entity_mapping.colander_super_type:
377 return None
378
379 entity_name = None
380 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name", "")
381 misp_attributes = entity_mapping.colander_misp_mapping.get("misp_attributes", {})
382 misp_attribute_for_name = misp_attributes.get("name", "")
383
384 # Try to extract the entity name from the MISP object property or attribute
385 if misp_property_for_name:
386 entity_name = getattr(misp_object, misp_property_for_name, None)
387 elif misp_attribute_for_name:
388 if (misp_attribute := get_attribute_by_name(misp_object, misp_attribute_for_name)) is not None:
389 entity_name = misp_attribute.value
390
391 if not entity_name:
392 return None
393
394 # Prepare the Colander entity using the mapping and extracted entity name
395 colander_entity = self._prepare_colander_entity(misp_object, entity_mapping, entity_name)
396
397 # Map MISP attributes to Colander entity fields
398 for colander_attribute_name, misp_property_name in misp_attributes.items():
399 # Skip literals, name, and nested fields
400 if colander_attribute_name in ["literals", "name"] or "." in colander_attribute_name:
401 continue
402 if (misp_attribute := get_attribute_by_name(misp_object, misp_property_name)) is None:
403 continue
404 if hasattr(colander_entity, colander_attribute_name) and misp_attribute.value:
405 setattr(colander_entity, colander_attribute_name, misp_attribute.value)
406
407 # If the Colander entity has an 'attributes' dict, add any extra MISP attributes not mapped above
408 if hasattr(colander_entity, "attributes"):
409 if not colander_entity.attributes:
410 colander_entity.attributes = {}
411 for attribute in misp_object.attributes:
412 if attribute.object_relation not in misp_attributes.values():
413 colander_entity.attributes[attribute.object_relation.replace("-", "_")] = str(attribute.value)
414
415 return colander_entity
416
[docs]
417 def convert_objects(self, misp_object: MISPEvent) -> List[EntityTypes]:
418 entities: List[EntityTypes] = []
419 for misp_object in misp_object.objects:
420 colander_entity = self.convert_object(misp_object)
421 if colander_entity:
422 entities.append(colander_entity)
423 return entities
424
439
[docs]
440 def convert_attribute(
441 self, misp_attribute: MISPAttribute, event_tags: Optional[List[MISPTag]] = None
442 ) -> Optional[EntityTypes]:
443 entity_mapping = self.mapping.get_misp_attribute_mapping(misp_attribute)
444 if not entity_mapping or not entity_mapping.colander_super_type:
445 return None
446 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name")
447 entity_name = getattr(misp_attribute, misp_property_for_name)
448 colander_entity = self._prepare_colander_entity(misp_attribute, entity_mapping, entity_name)
449 tags = event_tags or []
450 tags.extend(misp_attribute.tags)
451 self.convert_tags(colander_entity, tags)
452 if misp_attribute.to_ids:
453 colander_entity.add_attributes({"is_malicious": True})
454 return colander_entity
455
[docs]
456 def convert_attributes(self, misp_event: MISPEvent) -> List[EntityTypes]:
457 entities: List[EntityTypes] = []
458 for misp_attribute in misp_event.attributes:
459 colander_entity = self.convert_attribute(misp_attribute, misp_event.tags)
460 if colander_entity:
461 entities.append(colander_entity)
462 return entities
463
464
[docs]
465class MISPConverter:
466 """
467 Converter for MISP data to Colander data and vice versa.
468 Uses the mapping file to convert between formats.
469 """
470
[docs]
471 @staticmethod
472 def misp_to_colander(misp_feed: MISPFeed) -> Optional[List[ColanderFeed]]:
473 """
474 Convert a MISP feed to a list of Colander feeds. Each MISP event is converted to a separate Colander feed.
475
476 Args:
477 misp_feed: The MISP feed containing events to convert.
478
479 Returns:
480 A list of Colander feeds, or None if no events are found.
481 """
482 feeds: List[ColanderFeed] = []
483 mapper = MISPToColanderMapper()
484 if not misp_feed:
485 return feeds
486 events = misp_feed.get("response", None)
487 if "response" not in misp_feed:
488 events = [misp_feed]
489 for event in events or []:
490 misp_event = MISPEvent()
491 misp_event.from_dict(**event)
492 _, feed = mapper.convert_misp_event(misp_event)
493 feed.resolve_references()
494 feeds.append(feed)
495 return feeds
496
[docs]
497 @staticmethod
498 def colander_to_misp(colander_feed: ColanderFeed) -> Optional[List[MISPEvent]]:
499 """
500 Convert a Colander feed to a list of MISP events. Each Colander case is converted to a MISP event.
501
502 Args:
503 colander_feed: The Colander feed containing cases to convert.
504
505 Returns:
506 A list of MISP events, or None if no cases are found.
507 """
508 mapper = ColanderToMISPMapper()
509 colander_feed.resolve_references()
510 events: List[MISPEvent] = []
511 for _, case in colander_feed.cases.items():
512 misp_event, _ = mapper.convert_case(case, colander_feed)
513 events.append(misp_event)
514 return events