1from typing import Optional, Union, List, Tuple
2
3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent
4
5from colander_data_converter.base.common import TlpPapLevel
6from colander_data_converter.base.models import EntityTypes, Case, ColanderFeed, EntityRelation
7from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
8from colander_data_converter.converters.stix2.utils import get_nested_value
9
10
[docs]
11class MISPMapper:
12 """
13 Base mapper class for MISP conversions.
14
15 Provides common functionality for mapping Colander data structures to MISP objects.
16 """
17
18 def __init__(self):
19 self.mapping = Mapping()
20
[docs]
21 @staticmethod
22 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag:
23 """
24 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag.
25
26 Args:
27 tlp_level (TlpPapLevel): The TLP level to convert
28
29 Returns:
30 MISPTag: A MISP tag object with the TLP level name
31 """
32 t = MISPTag()
33 t.name = tlp_level.name
34 return t
35
36
[docs]
37class ColanderToMISPMapper(MISPMapper):
38 """
39 Mapper class for converting Colander objects to MISP format.
40
41 Handles the conversion of various Colander entity types (threats, actors, events,
42 artifacts, etc.) to their corresponding MISP object representations using
43 predefined mapping configurations.
44 """
45
[docs]
46 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]:
47 """
48 Convert a Colander object to its corresponding MISP representation.
49
50 This method performs the core conversion logic by:
51 1. Looking up the appropriate mapping for the Colander object type
52 2. Creating the corresponding MISP object (Attribute or Object)
53 3. Mapping fields, literals, and attributes from Colander to MISP format
54
55 Args:
56 colander_object (EntityTypes): The Colander object to convert
57
58 Returns:
59 Optional[Union[AbstractMISP, TagStub]]: The converted MISP object, or None if no mapping exists
60 """
61 # Get the mapping configuration for this Colander object type
62 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping(
63 colander_object.get_super_type(), colander_object.type
64 )
65
66 if entity_type_mapping is None:
67 return None
68
69 # Determine the MISP model class and type to create
70 misp_model, misp_type = entity_type_mapping.get_misp_model_class()
71
72 # Create the appropriate MISP object based on the model type
73 if issubclass(misp_model, MISPAttribute):
74 misp_object: MISPAttribute = misp_model(strict=True)
75 misp_object.type = misp_type
76 elif issubclass(misp_model, MISPObject):
77 misp_object: MISPObject = misp_model(name=misp_type, strict=True)
78 elif issubclass(misp_model, MISPTag):
79 return TagStub(entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name"))
80 else:
81 return None
82
83 # Set common MISP object properties
84 # ToDo: add tag for TLP
85 misp_object.uuid = str(colander_object.id)
86 misp_object.first_seen = colander_object.created_at
87 misp_object.last_seen = colander_object.updated_at
88
89 # Convert Colander object to dictionary for nested field access
90 colander_object_dict = colander_object.model_dump(mode="json")
91
92 # Map direct field mappings from Colander to MISP object properties
93 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping():
94 value = getattr(colander_object, source_field, None)
95 if value is not None:
96 setattr(misp_object, target_field, value)
97
98 # Set constant/literal values on the MISP object
99 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping():
100 if target_field in ["category", "comment"]:
101 setattr(misp_object, target_field, value)
102 else:
103 misp_object.add_attribute(target_field, value=value)
104
105 # Map Colander fields to MISP object attributes
106 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping():
107 if "." in source_field:
108 # Handle nested field access using dot notation
109 value = get_nested_value(colander_object_dict, source_field)
110 if value is not None:
111 misp_object.add_attribute(target_field, value=value)
112 else:
113 # Handle direct field access
114 value = getattr(colander_object, source_field, None)
115 if value is not None:
116 misp_object.add_attribute(target_field, value=value)
117
118 return misp_object
119
[docs]
120 @staticmethod
121 def get_element_from_event(
122 event: MISPEvent, uuid: str, types: List[str]
123 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]:
124 if "object" in types:
125 for obj in event.objects:
126 if hasattr(obj, "uuid") and obj.uuid == uuid:
127 return obj, "Object"
128 if "attribute" in types:
129 for obj in event.attributes:
130 if hasattr(obj, "uuid") and obj.uuid == uuid:
131 return obj, "Attribute"
132 return None, None
133
[docs]
134 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes):
135 super_type = colander_object.super_type
136 # Create relationships based on immutable relations
137 for _, relation in colander_object.get_immutable_relations().items():
138 reference_name = relation.name
139 relation_mapping = self.mapping.get_relation_mapping(super_type, reference_name)
140
141 if not relation_mapping:
142 continue
143
144 reverse = relation_mapping.get("reverse", False)
145 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id)
146 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id)
147 relation_name = relation_mapping.get("name", reference_name.replace("_", "-"))
148
149 # Tags only on MISPAttribute or MISPEvent
150 if relation_mapping.get("use_tag", False):
151 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"])
152 if reverse:
153 tag = self.convert_colander_object(relation.obj_from)
154 else:
155 tag = self.convert_colander_object(relation.obj_to)
156 if source_object and isinstance(tag, TagStub):
157 event.add_attribute_tag(tag, source_id)
158 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute
159 else:
160 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
161 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
162 if source_object and target_object:
163 source_object.add_relationship(type_name, target_id, relation_name)
164
[docs]
165 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]):
166 for relation in colander_relations:
167 source_id = str(relation.obj_from.id)
168 target_id = str(relation.obj_to.id)
169 source_object, _ = self.get_element_from_event(event, source_id, types=["object"])
170 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"])
171 if source_object and target_object:
172 source_object.add_relationship(type_name, target_id, relation.name)
173
[docs]
174 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]:
175 skipped = []
176 misp_event = MISPEvent()
177 misp_event.uuid = str(case.id)
178 misp_event.info = case.description
179 misp_event.date = case.created_at
180 for entity in feed.entities.values():
181 misp_object = self.convert_colander_object(entity)
182 if not misp_object:
183 skipped.append(entity)
184 continue
185 if isinstance(misp_object, MISPAttribute):
186 misp_event.add_attribute(**misp_object.to_dict())
187 elif isinstance(misp_object, MISPObject):
188 misp_event.add_object(misp_object)
189
190 # Immutable relations
191 for entity in feed.entities.values():
192 self.convert_immutable_relations(misp_event, entity)
193
194 # Regular relations
195 for entity in feed.entities.values():
196 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values()))
197
198 return misp_event, skipped