1from typing import Optional, Union, List, Tuple
2
3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent, MISPFeed
4
5from colander_data_converter.base.common import TlpPapLevel
6from colander_data_converter.base.models import (
7 EntityTypes,
8 Case,
9 ColanderFeed,
10 EntityRelation,
11 ColanderRepository,
12 Entity,
13)
14from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
15from colander_data_converter.converters.misp.utils import get_attribute_by_name
16from colander_data_converter.converters.stix2.utils import get_nested_value
17
18
[docs]
19class MISPMapper:
20 """
21 Base mapper class for MISP conversions.
22
23 Provides common functionality for mapping Colander data structures to MISP objects.
24 """
25
26 def __init__(self):
27 self.mapping = Mapping()
28 ColanderRepository().clear()
29
[docs]
30 @staticmethod
31 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag:
32 """
33 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag.
34
35 Args:
36 tlp_level: The TLP level to convert
37
38 Returns:
39 A MISP tag object with the TLP level name
40 """
41 t = MISPTag()
42 t.name = tlp_level.name
43 return t
44
45
[docs]
46class ColanderToMISPMapper(MISPMapper):
47 """
48 Mapper class for converting Colander objects to MISP format.
49
50 Handles the conversion of various Colander entity types (threats, actors, events,
51 artifacts, etc.) to their corresponding MISP object representations using
52 predefined mapping configurations.
53 """
54
[docs]
55 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]:
56 """
57 Convert a Colander object to its corresponding MISP representation.
58
59 This method performs the core conversion logic by:
60
61 1. Looking up the appropriate mapping for the Colander object type
62 2. Creating the corresponding MISP object (Attribute or Object)
63 3. Mapping fields, literals, and attributes from Colander to MISP format
64
65 Args:
66 colander_object: The Colander object to convert
67
68 Returns:
69 The converted MISP object, or None if no mapping exists
70 """
71 # Get the mapping configuration for this Colander object type
72 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping_to_misp(
73 colander_object.get_super_type(), colander_object.type
74 )
75
76 if entity_type_mapping is None:
77 return None
78
79 # Determine the MISP model class and type to create
80 misp_model, misp_type = entity_type_mapping.get_misp_model_class()
81
82 # Create the appropriate MISP object based on the model type
83 if issubclass(misp_model, MISPAttribute):
84 misp_object: MISPAttribute = misp_model(strict=True)
85 misp_object.type = misp_type
86 elif issubclass(misp_model, MISPObject):
87 misp_object: MISPObject = misp_model(name=misp_type, strict=True)
88 elif issubclass(misp_model, MISPTag):
89 tag_pattern = entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name")
90 return TagStub(tag_pattern.format(value=colander_object.name))
91 else:
92 return None
93
94 # Set common MISP object properties
95 # ToDo: add tag for TLP
96 misp_object.uuid = str(colander_object.id)
97 misp_object.first_seen = colander_object.created_at
98 misp_object.last_seen = colander_object.updated_at
99
100 # Convert Colander object to dictionary for nested field access
101 colander_object_dict = colander_object.model_dump(mode="json")
102
103 # Map direct field mappings from Colander to MISP object properties
104 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping():
105 value = getattr(colander_object, source_field, None)
106 if value is not None:
107 setattr(misp_object, target_field, value)
108
109 # Set constant/literal values on the MISP object
110 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping():
111 if target_field in ["category", "comment"]:
112 setattr(misp_object, target_field, value)
113 else:
114 misp_object.add_attribute(target_field, value=value)
115
116 # Map Colander fields to MISP object attributes
117 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping():
118 if "." in source_field:
119 # Handle nested field access using dot notation
120 value = get_nested_value(colander_object_dict, source_field)
121 if value is not None:
122 misp_object.add_attribute(target_field, value=value)
123 else:
124 # Handle direct field access
125 value = getattr(colander_object, source_field, None)
126 if value is not None:
127 misp_object.add_attribute(target_field, value=value)
128
129 return misp_object
130
[docs]
131 @staticmethod
132 def get_element_from_event(
133 event: MISPEvent, uuid: str, types: List[str]
134 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]:
135 """
136 Retrieve an element (object or attribute) from a MISP event by UUID and type.
137
138 Args:
139 event: The MISP event to search within.
140 uuid: The UUID of the element to find.
141 types: List of types to search for ("object", "attribute").
142
143 Returns:
144 The found element and its type as a string ("Object" or "Attribute"), or (None, None) if not found.
145 """
146 if "object" in types:
147 for obj in event.objects:
148 if hasattr(obj, "uuid") and obj.uuid == uuid:
149 return obj, "Object"
150 if "attribute" in types:
151 for obj in event.attributes:
152 if hasattr(obj, "uuid") and obj.uuid == uuid:
153 return obj, "Attribute"
154 return None, None
155
[docs]
156 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes):
157 """
158 Create relationships in a MISP event based on the Colander object's immutable relations.
159
160 This method processes each immutable relation defined in the Colander object, determines the appropriate
161 mapping and direction, and adds the corresponding relationship or tag to the MISP event.
162
163 Args:
164 event: The MISP event to which relationships or tags will be added.
165 colander_object: The Colander object containing immutable relations.
166
167 Note:
168 - If the relation mapping specifies 'use_tag', a tag is added to the relevant MISP attribute.
169 - Otherwise, a relationship is created between MISP objects or attributes as defined by the mapping.
170 """
171 super_type = colander_object.super_type
172 # Create relationships based on immutable relations
173 for _, relation in colander_object.get_immutable_relations().items():
174 reference_name = relation.name
175 relation_mapping = self.mapping.get_relation_mapping_to_misp(super_type, reference_name)
176
177 if not relation_mapping:
178 continue
179
180 reverse = relation_mapping.get("reverse", False)
181 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id)
182 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id)
183 relation_name = relation_mapping.get("name", reference_name.replace("_", "-"))
184
185 # Tags only on MISPAttribute or MISPEvent
186 if relation_mapping.get("use_tag", False):
187 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"])
188 if reverse:
189 tag = self.convert_colander_object(relation.obj_from)
190 else:
191 tag = self.convert_colander_object(relation.obj_to)
192 if source_object and isinstance(tag, TagStub):
193 event.add_attribute_tag(tag, source_id)
194 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute
195 else:
196 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
197 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
198 if source_object and target_object:
199 source_object.add_relationship(type_name, target_id, relation_name)
200
[docs]
201 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]):
202 """
203 Create relationships in a MISP event based on a list of Colander relations.
204
205 This method finds the corresponding MISP objects or attributes for each relation and
206 adds the relationship to the source object.
207
208 Args:
209 event: The MISP event to which relationships will be added.
210 colander_relations: List of Colander relations to convert.
211 """
212 for relation in colander_relations:
213 source_id = str(relation.obj_from.id)
214 target_id = str(relation.obj_to.id)
215 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
216 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
217 if source_object and target_object:
218 source_object.add_relationship(type_name, target_id, relation.name)
219
[docs]
220 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]:
221 """
222 Convert a Colander Case and its associated ColanderFeed into a MISPEvent.
223
224 This method performs the following steps:
225
226 1. Initializes a new MISPEvent using the case information.
227 2. Iterates over all entities in the feed, converting each to a MISP object or attribute.
228
229 - Entities that cannot be converted are added to the skipped list.
230 - MISPAttributes are added as attributes to the event.
231 - MISPObjects are added as objects to the event.
232 3. Processes immutable relations for each entity, adding corresponding relationships or tags to the event.
233 4. Processes regular (non-immutable) relations for each entity, adding relationships to the event.
234 5. Returns the constructed MISPEvent and a list of skipped entities.
235
236 Args:
237 case: The Colander case to convert.
238 feed: The feed containing entities and relations to convert.
239
240 Returns:
241 The resulting MISPEvent and a list of entities that were skipped during conversion.
242 """
243 skipped = []
244 misp_event = MISPEvent()
245 misp_event.uuid = str(case.id)
246 misp_event.info = case.description
247 misp_event.date = case.created_at
248 for entity in feed.entities.values():
249 if entity.case != case:
250 continue
251 misp_object = self.convert_colander_object(entity)
252 if not misp_object:
253 skipped.append(entity)
254 continue
255 if isinstance(misp_object, MISPAttribute):
256 misp_event.add_attribute(**misp_object.to_dict())
257 elif isinstance(misp_object, MISPObject):
258 misp_event.add_object(misp_object)
259
260 # Immutable relations
261 for entity in feed.entities.values():
262 self.convert_immutable_relations(misp_event, entity)
263
264 # Regular relations
265 for entity in feed.entities.values():
266 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values()))
267
268 return misp_event, skipped
269
270
[docs]
271class MISPToColanderMapper(MISPMapper):
[docs]
272 def convert_misp_event(self, event: MISPEvent) -> Tuple[Case, ColanderFeed]:
273 """
274 Convert a MISPEvent into a Colander case and feed.
275
276 This method performs the following steps:
277
278 1. Creates a new Case instance using the event information.
279 2. Initializes a ColanderFeed and adds the case to it.
280 3. Converts all MISP objects in the event to Colander entities and adds them to the feed.
281 4. Converts all MISP attributes in the event to Colander entities and adds them to the feed.
282 5. Converts all relations in the event to Colander relations and adds them to the feed.
283 6. Returns the constructed Case and ColanderFeed.
284
285 Args:
286 event: The MISP event to convert.
287
288 Returns:
289 The resulting Case and Feed.
290 """
291 case = Case(id=event.uuid, name=event.info, description=f"Loaded from MISP event [{event.uuid}]")
292 feed = ColanderFeed(cases={f"{case.id}": case})
293 for entity in self.convert_objects(event):
294 entity.case = case
295 feed.entities[str(entity.id)] = entity
296 for entity in self.convert_attributes(event):
297 entity.case = case
298 feed.entities[str(entity.id)] = entity
299 for relation in self.convert_relations(event):
300 relation.case = case
301 feed.relations[str(relation.id)] = relation
302 return case, feed
303
[docs]
304 def convert_relations(self, event: MISPEvent) -> List[EntityRelation]:
305 relations = []
306 for misp_object in event.objects + event.attributes:
307 source_object = ColanderRepository() >> misp_object.uuid
308 if not isinstance(source_object, Entity):
309 continue
310 for misp_relation in misp_object.relationships or []:
311 relation_name = misp_relation.relationship_type
312 target_object = ColanderRepository() >> misp_relation.related_object_uuid
313 if not isinstance(target_object, Entity):
314 continue
315 if relation_name:
316 relations.append(
317 EntityRelation(
318 id=misp_relation.uuid, name=relation_name, obj_from=source_object, obj_to=target_object
319 )
320 )
321 return relations
322
323 def _prepare_colander_entity(
324 self, misp_object: Union[MISPObject, MISPAttribute], entity_mapping: EntityTypeMapping, entity_name: str
325 ) -> Optional[EntityTypes]:
326 """
327 Prepare and populate a Colander entity from a MISP object using the provided mapping and entity name.
328
329 Args:
330 misp_object: The MISP object or attribute to convert.
331 entity_mapping: The mapping configuration for the entity type.
332 entity_name: The name to assign to the Colander entity.
333
334 Returns:
335 The populated Colander entity, or None if creation fails.
336 """
337 # Get the Colander model class and entity type from the mapping
338 colander_model_class = entity_mapping.colander_super_type.model_class
339 colander_entity_type = entity_mapping.colander_entity_type
340
341 # Instantiate the Colander entity with id, type, and name
342 colander_entity = colander_model_class(id=misp_object.uuid, type=colander_entity_type, name=entity_name)
343
344 # Map MISP object properties to Colander entity attributes based on the mapping
345 for colander_attribute_name, misp_property_name in entity_mapping.colander_misp_mapping.items():
346 # Skip mapping for literals, name, and misp_attributes keys
347 if colander_attribute_name in ["literals", "name", "misp_attributes"]:
348 continue
349 misp_value = getattr(misp_object, misp_property_name, None)
350 setattr(colander_entity, colander_attribute_name, misp_value)
351
352 return colander_entity
353
[docs]
354 def convert_object(self, misp_object: MISPObject) -> Optional[EntityTypes]:
355 """
356 Convert a MISPObject to its corresponding Colander entity.
357
358 This method uses the mapping configuration to extract the entity name and attributes
359 from the MISPObject, then creates and populates a Colander entity instance.
360
361 Args:
362 misp_object: The MISP object to convert.
363
364 Returns:
365 The resulting Colander entity, or None if mapping or name is missing.
366 """
367 # Get the mapping for this MISP object
368 entity_mapping = self.mapping.get_misp_object_mapping(misp_object)
369 if not entity_mapping or not entity_mapping.colander_super_type:
370 return None
371
372 entity_name = None
373 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name", "")
374 misp_attributes = entity_mapping.colander_misp_mapping.get("misp_attributes", {})
375 misp_attribute_for_name = misp_attributes.get("name", "")
376
377 # Try to extract the entity name from the MISP object property or attribute
378 if misp_property_for_name:
379 entity_name = getattr(misp_object, misp_property_for_name, None)
380 elif misp_attribute_for_name:
381 if (misp_attribute := get_attribute_by_name(misp_object, misp_attribute_for_name)) is not None:
382 entity_name = misp_attribute.value
383
384 if not entity_name:
385 return None
386
387 # Prepare the Colander entity using the mapping and extracted entity name
388 colander_entity = self._prepare_colander_entity(misp_object, entity_mapping, entity_name)
389
390 # Map MISP attributes to Colander entity fields
391 for colander_attribute_name, misp_property_name in misp_attributes.items():
392 # Skip literals, name, and nested fields
393 if colander_attribute_name in ["literals", "name"] or "." in colander_attribute_name:
394 continue
395 if (misp_attribute := get_attribute_by_name(misp_object, misp_property_name)) is None:
396 continue
397 if hasattr(colander_entity, colander_attribute_name) and misp_attribute.value:
398 setattr(colander_entity, colander_attribute_name, misp_attribute.value)
399
400 # If the Colander entity has an 'attributes' dict, add any extra MISP attributes not mapped above
401 if hasattr(colander_entity, "attributes"):
402 if not colander_entity.attributes:
403 colander_entity.attributes = {}
404 for attribute in misp_object.attributes:
405 if attribute.object_relation not in misp_attributes.values():
406 colander_entity.attributes[attribute.object_relation.replace("-", "_")] = str(attribute.value)
407
408 return colander_entity
409
[docs]
410 def convert_objects(self, misp_object: MISPEvent) -> List[EntityTypes]:
411 entities: List[EntityTypes] = []
412 for misp_object in misp_object.objects:
413 colander_entity = self.convert_object(misp_object)
414 if colander_entity:
415 entities.append(colander_entity)
416 return entities
417
432
[docs]
433 def convert_attribute(
434 self, misp_attribute: MISPAttribute, event_tags: Optional[List[MISPTag]] = None
435 ) -> Optional[EntityTypes]:
436 entity_mapping = self.mapping.get_misp_attribute_mapping(misp_attribute)
437 if not entity_mapping or not entity_mapping.colander_super_type:
438 return None
439 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name")
440 entity_name = getattr(misp_attribute, misp_property_for_name)
441 colander_entity = self._prepare_colander_entity(misp_attribute, entity_mapping, entity_name)
442 tags = event_tags or []
443 tags.extend(misp_attribute.tags)
444 self.convert_tags(colander_entity, tags)
445 if misp_attribute.to_ids:
446 colander_entity.add_attributes({"is_malicious": True})
447 return colander_entity
448
[docs]
449 def convert_attributes(self, misp_event: MISPEvent) -> List[EntityTypes]:
450 entities: List[EntityTypes] = []
451 for misp_attribute in misp_event.attributes:
452 colander_entity = self.convert_attribute(misp_attribute, misp_event.tags)
453 if colander_entity:
454 entities.append(colander_entity)
455 return entities
456
457
[docs]
458class MISPConverter:
459 """
460 Converter for MISP data to Colander data and vice versa.
461 Uses the mapping file to convert between formats.
462 """
463
[docs]
464 @staticmethod
465 def misp_to_colander(misp_feed: MISPFeed) -> Optional[List[ColanderFeed]]:
466 """
467 Convert a MISP feed to a list of Colander feeds. Each MISP event is converted to a separate Colander feed.
468
469 Args:
470 misp_feed: The MISP feed containing events to convert.
471
472 Returns:
473 A list of Colander feeds, or None if no events are found.
474 """
475 feeds: List[ColanderFeed] = []
476 mapper = MISPToColanderMapper()
477 if not misp_feed:
478 return feeds
479 events = misp_feed.get("response", None)
480 if "response" not in misp_feed:
481 events = [misp_feed]
482 for event in events or []:
483 misp_event = MISPEvent()
484 misp_event.from_dict(**event)
485 _, feed = mapper.convert_misp_event(misp_event)
486 feed.resolve_references()
487 feeds.append(feed)
488 return feeds
489
[docs]
490 @staticmethod
491 def colander_to_misp(colander_feed: ColanderFeed) -> Optional[List[MISPEvent]]:
492 """
493 Convert a Colander feed to a list of MISP events. Each Colander case is converted to a MISP event.
494
495 Args:
496 colander_feed: The Colander feed containing cases to convert.
497
498 Returns:
499 A list of MISP events, or None if no cases are found.
500 """
501 mapper = ColanderToMISPMapper()
502 colander_feed.resolve_references()
503 events: List[MISPEvent] = []
504 for _, case in colander_feed.cases.items():
505 misp_event, _ = mapper.convert_case(case, colander_feed)
506 events.append(misp_event)
507 return events