Source code for colander_data_converter.converters.misp.converter

  1from typing import Optional, Union, List, Tuple
  2
  3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent, MISPFeed
  4
  5from colander_data_converter.base.common import TlpPapLevel
  6from colander_data_converter.base.models import (
  7    EntityTypes,
  8    Case,
  9    ColanderFeed,
 10    EntityRelation,
 11    ColanderRepository,
 12    Entity,
 13)
 14from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
 15from colander_data_converter.converters.misp.utils import get_attribute_by_name
 16from colander_data_converter.converters.stix2.utils import get_nested_value
 17
 18
[docs] 19class MISPMapper: 20 """ 21 Base mapper class for MISP conversions. 22 23 Provides common functionality for mapping Colander data structures to MISP objects. 24 """ 25 26 def __init__(self): 27 self.mapping = Mapping() 28 ColanderRepository().clear() 29
[docs] 30 @staticmethod 31 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag: 32 """ 33 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag. 34 35 Args: 36 tlp_level: The TLP level to convert 37 38 Returns: 39 A MISP tag object with the TLP level name 40 """ 41 t = MISPTag() 42 t.name = tlp_level.name 43 return t
44 45
[docs] 46class ColanderToMISPMapper(MISPMapper): 47 """ 48 Mapper class for converting Colander objects to MISP format. 49 50 Handles the conversion of various Colander entity types (threats, actors, events, 51 artifacts, etc.) to their corresponding MISP object representations using 52 predefined mapping configurations. 53 """ 54
[docs] 55 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]: 56 """ 57 Convert a Colander object to its corresponding MISP representation. 58 59 This method performs the core conversion logic by: 60 61 1. Looking up the appropriate mapping for the Colander object type 62 2. Creating the corresponding MISP object (Attribute or Object) 63 3. Mapping fields, literals, and attributes from Colander to MISP format 64 65 Args: 66 colander_object: The Colander object to convert 67 68 Returns: 69 The converted MISP object, or None if no mapping exists 70 """ 71 # Get the mapping configuration for this Colander object type 72 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping_to_misp( 73 colander_object.get_super_type(), colander_object.type 74 ) 75 76 if entity_type_mapping is None: 77 return None 78 79 # Determine the MISP model class and type to create 80 misp_model, misp_type = entity_type_mapping.get_misp_model_class() 81 82 # Create the appropriate MISP object based on the model type 83 if issubclass(misp_model, MISPAttribute): 84 misp_object: MISPAttribute = misp_model(strict=True) 85 misp_object.type = misp_type 86 elif issubclass(misp_model, MISPObject): 87 misp_object: MISPObject = misp_model(name=misp_type, strict=True) 88 elif issubclass(misp_model, MISPTag): 89 tag_pattern = entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name") 90 return TagStub(tag_pattern.format(value=colander_object.name)) 91 else: 92 return None 93 94 # Set the "to_ids" attribute when the object is flagged as malicious 95 if hasattr(colander_object, "attributes"): 96 if colander_object.attributes and colander_object.attributes.get("is_malicious", False): 97 misp_object.to_ids = True 98 if hasattr(colander_object, "associated_threat") and colander_object.associated_threat is not None: 99 misp_object.to_ids = True 100 101 # Set common MISP object properties 102 # ToDo: add tag for TLP 103 misp_object.uuid = str(colander_object.id) 104 misp_object.first_seen = colander_object.created_at 105 misp_object.last_seen = colander_object.updated_at 106 107 # Convert Colander object to dictionary for nested field access 108 colander_object_dict = colander_object.model_dump(mode="json") 109 110 # Map direct field mappings from Colander to MISP object properties 111 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping(): 112 value = getattr(colander_object, source_field, None) 113 if value is not None: 114 setattr(misp_object, target_field, value) 115 116 # Set constant/literal values on the MISP object 117 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping(): 118 if target_field in ["category", "comment"]: 119 setattr(misp_object, target_field, value) 120 else: 121 misp_object.add_attribute(target_field, value=value) 122 123 # Map Colander fields to MISP object attributes 124 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping(): 125 if "." in source_field: 126 # Handle nested field access using dot notation 127 value = get_nested_value(colander_object_dict, source_field) 128 if value is not None: 129 misp_object.add_attribute(target_field, value=value) 130 else: 131 # Handle direct field access 132 value = getattr(colander_object, source_field, None) 133 if value is not None: 134 misp_object.add_attribute(target_field, value=value) 135 136 return misp_object
137
[docs] 138 @staticmethod 139 def get_element_from_event( 140 event: MISPEvent, uuid: str, types: List[str] 141 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]: 142 """ 143 Retrieve an element (object or attribute) from a MISP event by UUID and type. 144 145 Args: 146 event: The MISP event to search within. 147 uuid: The UUID of the element to find. 148 types: List of types to search for ("object", "attribute"). 149 150 Returns: 151 The found element and its type as a string ("Object" or "Attribute"), or (None, None) if not found. 152 """ 153 if "object" in types: 154 for obj in event.objects: 155 if hasattr(obj, "uuid") and obj.uuid == uuid: 156 return obj, "Object" 157 if "attribute" in types: 158 for obj in event.attributes: 159 if hasattr(obj, "uuid") and obj.uuid == uuid: 160 return obj, "Attribute" 161 return None, None
162
[docs] 163 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes): 164 """ 165 Create relationships in a MISP event based on the Colander object's immutable relations. 166 167 This method processes each immutable relation defined in the Colander object, determines the appropriate 168 mapping and direction, and adds the corresponding relationship or tag to the MISP event. 169 170 Args: 171 event: The MISP event to which relationships or tags will be added. 172 colander_object: The Colander object containing immutable relations. 173 174 Note: 175 - If the relation mapping specifies 'use_tag', a tag is added to the relevant MISP attribute. 176 - Otherwise, a relationship is created between MISP objects or attributes as defined by the mapping. 177 """ 178 super_type = colander_object.super_type 179 # Create relationships based on immutable relations 180 for _, relation in colander_object.get_immutable_relations().items(): 181 reference_name = relation.name 182 relation_mapping = self.mapping.get_relation_mapping_to_misp(super_type, reference_name) 183 184 if not relation_mapping: 185 continue 186 187 reverse = relation_mapping.get("reverse", False) 188 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id) 189 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id) 190 relation_name = relation_mapping.get("name", reference_name.replace("_", "-")) 191 192 # Tags only on MISPAttribute or MISPEvent 193 if relation_mapping.get("use_tag", False): 194 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"]) 195 if reverse: 196 tag = self.convert_colander_object(relation.obj_from) 197 else: 198 tag = self.convert_colander_object(relation.obj_to) 199 if source_object and isinstance(tag, TagStub): 200 event.add_attribute_tag(tag, source_id) 201 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute 202 else: 203 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 204 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 205 if source_object and target_object: 206 source_object.add_relationship(type_name, target_id, relation_name)
207
[docs] 208 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]): 209 """ 210 Create relationships in a MISP event based on a list of Colander relations. 211 212 This method finds the corresponding MISP objects or attributes for each relation and 213 adds the relationship to the source object. 214 215 Args: 216 event: The MISP event to which relationships will be added. 217 colander_relations: List of Colander relations to convert. 218 """ 219 for relation in colander_relations: 220 source_id = str(relation.obj_from.id) 221 target_id = str(relation.obj_to.id) 222 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 223 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 224 if source_object and target_object: 225 source_object.add_relationship(type_name, target_id, relation.name)
226
[docs] 227 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]: 228 """ 229 Convert a Colander Case and its associated ColanderFeed into a MISPEvent. 230 231 This method performs the following steps: 232 233 1. Initializes a new MISPEvent using the case information. 234 2. Iterates over all entities in the feed, converting each to a MISP object or attribute. 235 236 - Entities that cannot be converted are added to the skipped list. 237 - MISPAttributes are added as attributes to the event. 238 - MISPObjects are added as objects to the event. 239 3. Processes immutable relations for each entity, adding corresponding relationships or tags to the event. 240 4. Processes regular (non-immutable) relations for each entity, adding relationships to the event. 241 5. Returns the constructed MISPEvent and a list of skipped entities. 242 243 Args: 244 case: The Colander case to convert. 245 feed: The feed containing entities and relations to convert. 246 247 Returns: 248 The resulting MISPEvent and a list of entities that were skipped during conversion. 249 """ 250 skipped = [] 251 misp_event = MISPEvent() 252 misp_event.uuid = str(case.id) 253 misp_event.info = case.description 254 misp_event.date = case.created_at 255 for entity in feed.entities.values(): 256 if entity.case != case: 257 continue 258 misp_object = self.convert_colander_object(entity) 259 if not misp_object: 260 skipped.append(entity) 261 continue 262 if isinstance(misp_object, MISPAttribute): 263 misp_event.add_attribute(**misp_object.to_dict()) 264 elif isinstance(misp_object, MISPObject): 265 misp_event.add_object(misp_object) 266 267 # Immutable relations 268 for entity in feed.entities.values(): 269 self.convert_immutable_relations(misp_event, entity) 270 271 # Regular relations 272 for entity in feed.entities.values(): 273 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values())) 274 275 return misp_event, skipped
276 277
[docs] 278class MISPToColanderMapper(MISPMapper):
[docs] 279 def convert_misp_event(self, event: MISPEvent) -> Tuple[Case, ColanderFeed]: 280 """ 281 Convert a MISPEvent into a Colander case and feed. 282 283 This method performs the following steps: 284 285 1. Creates a new Case instance using the event information. 286 2. Initializes a ColanderFeed and adds the case to it. 287 3. Converts all MISP objects in the event to Colander entities and adds them to the feed. 288 4. Converts all MISP attributes in the event to Colander entities and adds them to the feed. 289 5. Converts all relations in the event to Colander relations and adds them to the feed. 290 6. Returns the constructed Case and ColanderFeed. 291 292 Args: 293 event: The MISP event to convert. 294 295 Returns: 296 The resulting Case and Feed. 297 """ 298 case = Case(id=event.uuid, name=event.info, description=f"Loaded from MISP event [{event.uuid}]") 299 feed = ColanderFeed(cases={f"{case.id}": case}) 300 for entity in self.convert_objects(event): 301 entity.case = case 302 feed.entities[str(entity.id)] = entity 303 for entity in self.convert_attributes(event): 304 entity.case = case 305 feed.entities[str(entity.id)] = entity 306 for relation in self.convert_relations(event): 307 relation.case = case 308 feed.relations[str(relation.id)] = relation 309 return case, feed
310
[docs] 311 def convert_relations(self, event: MISPEvent) -> List[EntityRelation]: 312 relations = [] 313 for misp_object in event.objects + event.attributes: 314 source_object = ColanderRepository() >> misp_object.uuid 315 if not isinstance(source_object, Entity): 316 continue 317 for misp_relation in misp_object.relationships or []: 318 relation_name = misp_relation.relationship_type 319 target_object = ColanderRepository() >> misp_relation.related_object_uuid 320 if not isinstance(target_object, Entity): 321 continue 322 if relation_name: 323 relations.append( 324 EntityRelation( 325 id=misp_relation.uuid, name=relation_name, obj_from=source_object, obj_to=target_object 326 ) 327 ) 328 return relations
329 330 def _prepare_colander_entity( 331 self, misp_object: Union[MISPObject, MISPAttribute], entity_mapping: EntityTypeMapping, entity_name: str 332 ) -> Optional[EntityTypes]: 333 """ 334 Prepare and populate a Colander entity from a MISP object using the provided mapping and entity name. 335 336 Args: 337 misp_object: The MISP object or attribute to convert. 338 entity_mapping: The mapping configuration for the entity type. 339 entity_name: The name to assign to the Colander entity. 340 341 Returns: 342 The populated Colander entity, or None if creation fails. 343 """ 344 # Get the Colander model class and entity type from the mapping 345 colander_model_class = entity_mapping.colander_super_type.model_class 346 colander_entity_type = entity_mapping.colander_entity_type 347 348 # Instantiate the Colander entity with id, type, and name 349 colander_entity = colander_model_class(id=misp_object.uuid, type=colander_entity_type, name=entity_name) 350 351 # Map MISP object properties to Colander entity attributes based on the mapping 352 for colander_attribute_name, misp_property_name in entity_mapping.colander_misp_mapping.items(): 353 # Skip mapping for literals, name, and misp_attributes keys 354 if colander_attribute_name in ["literals", "name", "misp_attributes"]: 355 continue 356 misp_value = getattr(misp_object, misp_property_name, None) 357 setattr(colander_entity, colander_attribute_name, misp_value) 358 359 return colander_entity 360
[docs] 361 def convert_object(self, misp_object: MISPObject) -> Optional[EntityTypes]: 362 """ 363 Convert a MISPObject to its corresponding Colander entity. 364 365 This method uses the mapping configuration to extract the entity name and attributes 366 from the MISPObject, then creates and populates a Colander entity instance. 367 368 Args: 369 misp_object: The MISP object to convert. 370 371 Returns: 372 The resulting Colander entity, or None if mapping or name is missing. 373 """ 374 # Get the mapping for this MISP object 375 entity_mapping = self.mapping.get_misp_object_mapping(misp_object) 376 if not entity_mapping or not entity_mapping.colander_super_type: 377 return None 378 379 entity_name = None 380 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name", "") 381 misp_attributes = entity_mapping.colander_misp_mapping.get("misp_attributes", {}) 382 misp_attribute_for_name = misp_attributes.get("name", "") 383 384 # Try to extract the entity name from the MISP object property or attribute 385 if misp_property_for_name: 386 entity_name = getattr(misp_object, misp_property_for_name, None) 387 elif misp_attribute_for_name: 388 if (misp_attribute := get_attribute_by_name(misp_object, misp_attribute_for_name)) is not None: 389 entity_name = misp_attribute.value 390 391 if not entity_name: 392 return None 393 394 # Prepare the Colander entity using the mapping and extracted entity name 395 colander_entity = self._prepare_colander_entity(misp_object, entity_mapping, entity_name) 396 397 # Map MISP attributes to Colander entity fields 398 for colander_attribute_name, misp_property_name in misp_attributes.items(): 399 # Skip literals, name, and nested fields 400 if colander_attribute_name in ["literals", "name"] or "." in colander_attribute_name: 401 continue 402 if (misp_attribute := get_attribute_by_name(misp_object, misp_property_name)) is None: 403 continue 404 if hasattr(colander_entity, colander_attribute_name) and misp_attribute.value: 405 setattr(colander_entity, colander_attribute_name, misp_attribute.value) 406 407 # If the Colander entity has an 'attributes' dict, add any extra MISP attributes not mapped above 408 if hasattr(colander_entity, "attributes"): 409 if not colander_entity.attributes: 410 colander_entity.attributes = {} 411 for attribute in misp_object.attributes: 412 if attribute.object_relation not in misp_attributes.values(): 413 colander_entity.attributes[attribute.object_relation.replace("-", "_")] = str(attribute.value) 414 415 return colander_entity
416
[docs] 417 def convert_objects(self, misp_object: MISPEvent) -> List[EntityTypes]: 418 entities: List[EntityTypes] = [] 419 for misp_object in misp_object.objects: 420 colander_entity = self.convert_object(misp_object) 421 if colander_entity: 422 entities.append(colander_entity) 423 return entities
424
[docs] 425 def convert_tags(self, colander_entity: EntityTypes, tags: Optional[List[MISPTag]]): 426 if not tags: 427 return 428 for tag in tags: 429 if tag.name.startswith("tlp"): 430 for level in TlpPapLevel: 431 if level.name.lower() in tag.name.lower(): 432 colander_entity.tlp = level 433 elif tag.name.startswith("misp-galaxy:threat-actor"): 434 actor_name = tag.name 435 actor_name = actor_name.replace("misp-galaxy:threat-actor=", "").replace('"', "") 436 colander_entity.add_tags([actor_name]) 437 else: 438 colander_entity.add_tags([tag.name])
439
[docs] 440 def convert_attribute( 441 self, misp_attribute: MISPAttribute, event_tags: Optional[List[MISPTag]] = None 442 ) -> Optional[EntityTypes]: 443 entity_mapping = self.mapping.get_misp_attribute_mapping(misp_attribute) 444 if not entity_mapping or not entity_mapping.colander_super_type: 445 return None 446 misp_property_for_name = entity_mapping.colander_misp_mapping.get("name") 447 entity_name = getattr(misp_attribute, misp_property_for_name) 448 colander_entity = self._prepare_colander_entity(misp_attribute, entity_mapping, entity_name) 449 tags = event_tags or [] 450 tags.extend(misp_attribute.tags) 451 self.convert_tags(colander_entity, tags) 452 if misp_attribute.to_ids: 453 colander_entity.add_attributes({"is_malicious": True}) 454 return colander_entity
455
[docs] 456 def convert_attributes(self, misp_event: MISPEvent) -> List[EntityTypes]: 457 entities: List[EntityTypes] = [] 458 for misp_attribute in misp_event.attributes: 459 colander_entity = self.convert_attribute(misp_attribute, misp_event.tags) 460 if colander_entity: 461 entities.append(colander_entity) 462 return entities
463 464
[docs] 465class MISPConverter: 466 """ 467 Converter for MISP data to Colander data and vice versa. 468 Uses the mapping file to convert between formats. 469 """ 470
[docs] 471 @staticmethod 472 def misp_to_colander(misp_feed: MISPFeed) -> Optional[List[ColanderFeed]]: 473 """ 474 Convert a MISP feed to a list of Colander feeds. Each MISP event is converted to a separate Colander feed. 475 476 Args: 477 misp_feed: The MISP feed containing events to convert. 478 479 Returns: 480 A list of Colander feeds, or None if no events are found. 481 """ 482 feeds: List[ColanderFeed] = [] 483 mapper = MISPToColanderMapper() 484 if not misp_feed: 485 return feeds 486 events = misp_feed.get("response", None) 487 if "response" not in misp_feed: 488 events = [misp_feed] 489 for event in events or []: 490 misp_event = MISPEvent() 491 misp_event.from_dict(**event) 492 _, feed = mapper.convert_misp_event(misp_event) 493 feed.resolve_references() 494 feeds.append(feed) 495 return feeds
496
[docs] 497 @staticmethod 498 def colander_to_misp(colander_feed: ColanderFeed) -> Optional[List[MISPEvent]]: 499 """ 500 Convert a Colander feed to a list of MISP events. Each Colander case is converted to a MISP event. 501 502 Args: 503 colander_feed: The Colander feed containing cases to convert. 504 505 Returns: 506 A list of MISP events, or None if no cases are found. 507 """ 508 mapper = ColanderToMISPMapper() 509 colander_feed.resolve_references() 510 events: List[MISPEvent] = [] 511 for _, case in colander_feed.cases.items(): 512 misp_event, _ = mapper.convert_case(case, colander_feed) 513 events.append(misp_event) 514 return events