Source code for colander_data_converter.converters.misp.converter

  1from typing import Optional, Union, List, Tuple
  2
  3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent, MISPFeed
  4
  5from colander_data_converter.base.common import TlpPapLevel
  6from colander_data_converter.base.models import (
  7    EntityTypes,
  8    Case,
  9    ColanderFeed,
 10    EntityRelation,
 11    ColanderRepository,
 12    Entity,
 13)
 14from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
 15from colander_data_converter.converters.misp.utils import get_attribute_by_name
 16from colander_data_converter.converters.stix2.utils import get_nested_value
 17
 18
[docs] 19class MISPMapper: 20 """ 21 Base mapper class for MISP conversions. 22 23 Provides common functionality for mapping Colander data structures to MISP objects. 24 """ 25 26 def __init__(self): 27 self.mapping = Mapping() 28 ColanderRepository().clear() 29
[docs] 30 @staticmethod 31 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag: 32 """ 33 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag. 34 35 Args: 36 tlp_level: The TLP level to convert 37 38 Returns: 39 A MISP tag object with the TLP level name 40 """ 41 t = MISPTag() 42 t.name = tlp_level.name 43 return t
44 45
[docs] 46class ColanderToMISPMapper(MISPMapper): 47 """ 48 Mapper class for converting Colander objects to MISP format. 49 50 Handles the conversion of various Colander entity types (threats, actors, events, 51 artifacts, etc.) to their corresponding MISP object representations using 52 predefined mapping configurations. 53 """ 54
[docs] 55 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]: 56 """ 57 Convert a Colander object to its corresponding MISP representation. 58 59 This method performs the core conversion logic by: 60 61 1. Looking up the appropriate mapping for the Colander object type 62 2. Creating the corresponding MISP object (Attribute or Object) 63 3. Mapping fields, literals, and attributes from Colander to MISP format 64 65 Args: 66 colander_object: The Colander object to convert 67 68 Returns: 69 The converted MISP object, or None if no mapping exists 70 """ 71 # Get the mapping configuration for this Colander object type 72 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping_to_misp( 73 colander_object.get_super_type(), colander_object.type 74 ) 75 76 if entity_type_mapping is None: 77 return None 78 79 # Determine the MISP model class and type to create 80 misp_model, misp_type = entity_type_mapping.get_misp_model_class() 81 82 # Create the appropriate MISP object based on the model type 83 if issubclass(misp_model, MISPAttribute): 84 misp_object: MISPAttribute = misp_model(strict=True) 85 misp_object.type = misp_type 86 elif issubclass(misp_model, MISPObject): 87 misp_object: MISPObject = misp_model(name=misp_type, strict=True) 88 elif issubclass(misp_model, MISPTag): 89 tag_pattern = entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name") 90 return TagStub(tag_pattern.format(value=colander_object.name)) 91 else: 92 return None 93 94 # Set common MISP object properties 95 # ToDo: add tag for TLP 96 misp_object.uuid = str(colander_object.id) 97 misp_object.first_seen = colander_object.created_at 98 misp_object.last_seen = colander_object.updated_at 99 100 # Convert Colander object to dictionary for nested field access 101 colander_object_dict = colander_object.model_dump(mode="json") 102 103 # Map direct field mappings from Colander to MISP object properties 104 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping(): 105 value = getattr(colander_object, source_field, None) 106 if value is not None: 107 setattr(misp_object, target_field, value) 108 109 # Set constant/literal values on the MISP object 110 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping(): 111 if target_field in ["category", "comment"]: 112 setattr(misp_object, target_field, value) 113 else: 114 misp_object.add_attribute(target_field, value=value) 115 116 # Map Colander fields to MISP object attributes 117 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping(): 118 if "." in source_field: 119 # Handle nested field access using dot notation 120 value = get_nested_value(colander_object_dict, source_field) 121 if value is not None: 122 misp_object.add_attribute(target_field, value=value) 123 else: 124 # Handle direct field access 125 value = getattr(colander_object, source_field, None) 126 if value is not None: 127 misp_object.add_attribute(target_field, value=value) 128 129 return misp_object
130
[docs] 131 @staticmethod 132 def get_element_from_event( 133 event: MISPEvent, uuid: str, types: List[str] 134 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]: 135 """ 136 Retrieve an element (object or attribute) from a MISP event by UUID and type. 137 138 Args: 139 event: The MISP event to search within. 140 uuid: The UUID of the element to find. 141 types: List of types to search for ("object", "attribute"). 142 143 Returns: 144 The found element and its type as a string ("Object" or "Attribute"), or (None, None) if not found. 145 """ 146 if "object" in types: 147 for obj in event.objects: 148 if hasattr(obj, "uuid") and obj.uuid == uuid: 149 return obj, "Object" 150 if "attribute" in types: 151 for obj in event.attributes: 152 if hasattr(obj, "uuid") and obj.uuid == uuid: 153 return obj, "Attribute" 154 return None, None
155
[docs] 156 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes): 157 """ 158 Create relationships in a MISP event based on the Colander object's immutable relations. 159 160 This method processes each immutable relation defined in the Colander object, determines the appropriate 161 mapping and direction, and adds the corresponding relationship or tag to the MISP event. 162 163 Args: 164 event: The MISP event to which relationships or tags will be added. 165 colander_object: The Colander object containing immutable relations. 166 167 Note: 168 - If the relation mapping specifies 'use_tag', a tag is added to the relevant MISP attribute. 169 - Otherwise, a relationship is created between MISP objects or attributes as defined by the mapping. 170 """ 171 super_type = colander_object.super_type 172 # Create relationships based on immutable relations 173 for _, relation in colander_object.get_immutable_relations().items(): 174 reference_name = relation.name 175 relation_mapping = self.mapping.get_relation_mapping_to_misp(super_type, reference_name) 176 177 if not relation_mapping: 178 continue 179 180 reverse = relation_mapping.get("reverse", False) 181 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id) 182 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id) 183 relation_name = relation_mapping.get("name", reference_name.replace("_", "-")) 184 185 # Tags only on MISPAttribute or MISPEvent 186 if relation_mapping.get("use_tag", False): 187 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"]) 188 if reverse: 189 tag = self.convert_colander_object(relation.obj_from) 190 else: 191 tag = self.convert_colander_object(relation.obj_to) 192 if source_object and isinstance(tag, TagStub): 193 event.add_attribute_tag(tag, source_id) 194 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute 195 else: 196 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 197 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 198 if source_object and target_object: 199 source_object.add_relationship(type_name, target_id, relation_name)
200
[docs] 201 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]): 202 """ 203 Create relationships in a MISP event based on a list of Colander relations. 204 205 This method finds the corresponding MISP objects or attributes for each relation and 206 adds the relationship to the source object. 207 208 Args: 209 event: The MISP event to which relationships will be added. 210 colander_relations: List of Colander relations to convert. 211 """ 212 for relation in colander_relations: 213 source_id = str(relation.obj_from.id) 214 target_id = str(relation.obj_to.id) 215 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 216 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 217 if source_object and target_object: 218 source_object.add_relationship(type_name, target_id, relation.name)
219
[docs] 220 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]: 221 """ 222 Convert a Colander Case and its associated ColanderFeed into a MISPEvent. 223 224 This method performs the following steps: 225 226 1. Initializes a new MISPEvent using the case information. 227 2. Iterates over all entities in the feed, converting each to a MISP object or attribute. 228 229 - Entities that cannot be converted are added to the skipped list. 230 - MISPAttributes are added as attributes to the event. 231 - MISPObjects are added as objects to the event. 232 3. Processes immutable relations for each entity, adding corresponding relationships or tags to the event. 233 4. Processes regular (non-immutable) relations for each entity, adding relationships to the event. 234 5. Returns the constructed MISPEvent and a list of skipped entities. 235 236 Args: 237 case: The Colander case to convert. 238 feed: The feed containing entities and relations to convert. 239 240 Returns: 241 The resulting MISPEvent and a list of entities that were skipped during conversion. 242 """ 243 skipped = [] 244 misp_event = MISPEvent() 245 misp_event.uuid = str(case.id) 246 misp_event.info = case.description 247 misp_event.date = case.created_at 248 for entity in feed.entities.values(): 249 if entity.case != case: 250 continue 251 misp_object = self.convert_colander_object(entity) 252 if not misp_object: 253 skipped.append(entity) 254 continue 255 if isinstance(misp_object, MISPAttribute): 256 misp_event.add_attribute(**misp_object.to_dict()) 257 elif isinstance(misp_object, MISPObject): 258 misp_event.add_object(misp_object) 259 260 # Immutable relations 261 for entity in feed.entities.values(): 262 self.convert_immutable_relations(misp_event, entity) 263 264 # Regular relations 265 for entity in feed.entities.values(): 266 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values())) 267 268 return misp_event, skipped
269 270
[docs] 271class MISPToColanderMapper(MISPMapper):
[docs] 272 def convert_misp_event(self, event: MISPEvent) -> Tuple[Case, ColanderFeed]: 273 """ 274 Convert a MISPEvent into a Colander case and feed. 275 276 This method performs the following steps: 277 278 1. Creates a new Case instance using the event information. 279 2. Initializes a ColanderFeed and adds the case to it. 280 3. Converts all MISP objects in the event to Colander entities and adds them to the feed. 281 4. Converts all MISP attributes in the event to Colander entities and adds them to the feed. 282 5. Converts all relations in the event to Colander relations and adds them to the feed. 283 6. Returns the constructed Case and ColanderFeed. 284 285 Args: 286 event: The MISP event to convert. 287 288 Returns: 289 The resulting Case and Feed. 290 """ 291 case = Case(id=event.uuid, name=event.info, description=f"Loaded from MISP event [{event.uuid}]") 292 feed = ColanderFeed(cases={f"{case.id}": case}) 293 for entity in self.convert_objects(event): 294 entity.case = case 295 feed.entities[str(entity.id)] = entity 296 for entity in self.convert_attributes(event): 297 entity.case = case 298 feed.entities[str(entity.id)] = entity 299 for relation in self.convert_relations(event): 300 relation.case = case 301 feed.relations[str(relation.id)] = relation 302 return case, feed
303
[docs] 304 def convert_relations(self, event: MISPEvent) -> List[EntityRelation]: 305 relations = [] 306 for misp_object in event.objects + event.attributes: 307 source_object = ColanderRepository() >> misp_object.uuid 308 if not isinstance(source_object, Entity): 309 continue 310 for misp_relation in misp_object.relationships or []: 311 relation_name = misp_relation.relationship_type 312 target_object = ColanderRepository() >> misp_relation.related_object_uuid 313 if not isinstance(target_object, Entity): 314 continue 315 if relation_name: 316 relations.append( 317 EntityRelation( 318 id=misp_relation.uuid, name=relation_name, obj_from=source_object, obj_to=target_object 319 ) 320 ) 321 return relations
322 323 def _prepare_colander_entity( 324 self, misp_object: Union[MISPObject, MISPAttribute], entity_mapping: EntityTypeMapping, entity_name: str 325 ) -> Optional[EntityTypes]: 326 """ 327 Prepare and populate a Colander entity from a MISP object using the provided mapping and entity name. 328 329 Args: 330 misp_object: The MISP object or attribute to convert. 331 entity_mapping: The mapping configuration for the entity type. 332 entity_name: The name to assign to the Colander entity. 333 334 Returns: 335 The populated Colander entity, or None if creation fails. 336 """ 337 # Get the Colander model class and entity type from the mapping 338 colander_model_class = entity_mapping.colander_super_type.model_class 339 colander_entity_type = entity_mapping.colander_entity_type 340 341 # Instantiate the Colander entity with id, type, and name 342 colander_entity = colander_model_class(id=misp_object.uuid, type=colander_entity_type, name=entity_name) 343 344 # Map MISP object properties to Colander entity attributes based on the mapping 345 for colander_attribute_name, misp_property_name in entity_mapping.colander_misp_mapping.items(): 346 # Skip mapping for literals, name, and misp_attributes keys 347 if colander_attribute_name in ["literals", "name", "misp_attributes"]: 348 continue 349 misp_value = getattr(misp_object, misp_property_name, None) 350 setattr(colander_entity, colander_attribute_name, misp_value) 351 352 return colander_entity 353
[docs] 354 def convert_object(self, misp_object: MISPObject) -> Optional[EntityTypes]: 355 """ 356 Convert a MISPObject to its corresponding Colander entity. 357 358 This method uses the mapping configuration to extract the entity name and attributes 359 from the MISPObject, then creates and populates a Colander entity instance. 360 361 Args: 362 misp_object: The MISP object to convert. 363 364 Returns: 365 The resulting Colander entity, or None if mapping or name is missing. 366 """ 367 # Get the mapping for this MISP object 368 entity_mapping = self.mapping.get_misp_object_mapping(misp_object) 369 if not entity_mapping or not entity_mapping.colander_super_type: 370 return None 371 372 entity_name = None 373 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name", "") 374 misp_attributes = entity_mapping.colander_misp_mapping.get("misp_attributes", {}) 375 misp_attribute_for_name = misp_attributes.get("name", "") 376 377 # Try to extract the entity name from the MISP object property or attribute 378 if misp_property_for_name: 379 entity_name = getattr(misp_object, misp_property_for_name, None) 380 elif misp_attribute_for_name: 381 if (misp_attribute := get_attribute_by_name(misp_object, misp_attribute_for_name)) is not None: 382 entity_name = misp_attribute.value 383 384 if not entity_name: 385 return None 386 387 # Prepare the Colander entity using the mapping and extracted entity name 388 colander_entity = self._prepare_colander_entity(misp_object, entity_mapping, entity_name) 389 390 # Map MISP attributes to Colander entity fields 391 for colander_attribute_name, misp_property_name in misp_attributes.items(): 392 # Skip literals, name, and nested fields 393 if colander_attribute_name in ["literals", "name"] or "." in colander_attribute_name: 394 continue 395 if (misp_attribute := get_attribute_by_name(misp_object, misp_property_name)) is None: 396 continue 397 if hasattr(colander_entity, colander_attribute_name) and misp_attribute.value: 398 setattr(colander_entity, colander_attribute_name, misp_attribute.value) 399 400 # If the Colander entity has an 'attributes' dict, add any extra MISP attributes not mapped above 401 if hasattr(colander_entity, "attributes"): 402 if not colander_entity.attributes: 403 colander_entity.attributes = {} 404 for attribute in misp_object.attributes: 405 if attribute.object_relation not in misp_attributes.values(): 406 colander_entity.attributes[attribute.object_relation.replace("-", "_")] = str(attribute.value) 407 408 return colander_entity
409
[docs] 410 def convert_objects(self, misp_object: MISPEvent) -> List[EntityTypes]: 411 entities: List[EntityTypes] = [] 412 for misp_object in misp_object.objects: 413 colander_entity = self.convert_object(misp_object) 414 if colander_entity: 415 entities.append(colander_entity) 416 return entities
417
[docs] 418 def convert_tags(self, colander_entity: EntityTypes, tags: Optional[List[MISPTag]]): 419 if not tags: 420 return 421 for tag in tags: 422 if tag.name.startswith("tlp"): 423 for level in TlpPapLevel: 424 if level.name.lower() in tag.name.lower(): 425 colander_entity.tlp = level 426 elif tag.name.startswith("misp-galaxy:threat-actor"): 427 actor_name = tag.name 428 actor_name = actor_name.replace("misp-galaxy:threat-actor=", "").replace('"', "") 429 colander_entity.add_tags([actor_name]) 430 else: 431 colander_entity.add_tags([tag.name])
432
[docs] 433 def convert_attribute( 434 self, misp_attribute: MISPAttribute, event_tags: Optional[List[MISPTag]] = None 435 ) -> Optional[EntityTypes]: 436 entity_mapping = self.mapping.get_misp_attribute_mapping(misp_attribute) 437 if not entity_mapping or not entity_mapping.colander_super_type: 438 return None 439 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name") 440 entity_name = getattr(misp_attribute, misp_property_for_name) 441 colander_entity = self._prepare_colander_entity(misp_attribute, entity_mapping, entity_name) 442 tags = event_tags or [] 443 tags.extend(misp_attribute.tags) 444 self.convert_tags(colander_entity, tags) 445 if misp_attribute.to_ids: 446 colander_entity.add_attributes({"is_malicious": True}) 447 return colander_entity
448
[docs] 449 def convert_attributes(self, misp_event: MISPEvent) -> List[EntityTypes]: 450 entities: List[EntityTypes] = [] 451 for misp_attribute in misp_event.attributes: 452 colander_entity = self.convert_attribute(misp_attribute, misp_event.tags) 453 if colander_entity: 454 entities.append(colander_entity) 455 return entities
456 457
[docs] 458class MISPConverter: 459 """ 460 Converter for MISP data to Colander data and vice versa. 461 Uses the mapping file to convert between formats. 462 """ 463
[docs] 464 @staticmethod 465 def misp_to_colander(misp_feed: MISPFeed) -> Optional[List[ColanderFeed]]: 466 """ 467 Convert a MISP feed to a list of Colander feeds. Each MISP event is converted to a separate Colander feed. 468 469 Args: 470 misp_feed: The MISP feed containing events to convert. 471 472 Returns: 473 A list of Colander feeds, or None if no events are found. 474 """ 475 feeds: List[ColanderFeed] = [] 476 mapper = MISPToColanderMapper() 477 if not misp_feed: 478 return feeds 479 events = misp_feed.get("response", None) 480 if "response" not in misp_feed: 481 events = [misp_feed] 482 for event in events or []: 483 misp_event = MISPEvent() 484 misp_event.from_dict(**event) 485 _, feed = mapper.convert_misp_event(misp_event) 486 feed.resolve_references() 487 feeds.append(feed) 488 return feeds
489
[docs] 490 @staticmethod 491 def colander_to_misp(colander_feed: ColanderFeed) -> Optional[List[MISPEvent]]: 492 """ 493 Convert a Colander feed to a list of MISP events. Each Colander case is converted to a MISP event. 494 495 Args: 496 colander_feed: The Colander feed containing cases to convert. 497 498 Returns: 499 A list of MISP events, or None if no cases are found. 500 """ 501 mapper = ColanderToMISPMapper() 502 colander_feed.resolve_references() 503 events: List[MISPEvent] = [] 504 for _, case in colander_feed.cases.items(): 505 misp_event, _ = mapper.convert_case(case, colander_feed) 506 events.append(misp_event) 507 return events