Source code for colander_data_converter.converters.misp.converter

  1from typing import Optional, Union, List, Tuple
  2
  3from pymisp import AbstractMISP, MISPTag, MISPObject, MISPAttribute, MISPEvent
  4
  5from colander_data_converter.base.common import TlpPapLevel
  6from colander_data_converter.base.models import EntityTypes, Case, ColanderFeed, EntityRelation
  7from colander_data_converter.converters.misp.models import Mapping, EntityTypeMapping, TagStub
  8from colander_data_converter.converters.stix2.utils import get_nested_value
  9
 10
[docs] 11class MISPMapper: 12 """ 13 Base mapper class for MISP conversions. 14 15 Provides common functionality for mapping Colander data structures to MISP objects. 16 """ 17 18 def __init__(self): 19 self.mapping = Mapping() 20
[docs] 21 @staticmethod 22 def tlp_level_to_tag(tlp_level: TlpPapLevel) -> MISPTag: 23 """ 24 Convert a Colander TLP (Traffic Light Protocol) level to a MISP tag. 25 26 Args: 27 tlp_level (TlpPapLevel): The TLP level to convert 28 29 Returns: 30 MISPTag: A MISP tag object with the TLP level name 31 """ 32 t = MISPTag() 33 t.name = tlp_level.name 34 return t
35 36
[docs] 37class ColanderToMISPMapper(MISPMapper): 38 """ 39 Mapper class for converting Colander objects to MISP format. 40 41 Handles the conversion of various Colander entity types (threats, actors, events, 42 artifacts, etc.) to their corresponding MISP object representations using 43 predefined mapping configurations. 44 """ 45
[docs] 46 def convert_colander_object(self, colander_object: EntityTypes) -> Optional[Union[AbstractMISP, TagStub]]: 47 """ 48 Convert a Colander object to its corresponding MISP representation. 49 50 This method performs the core conversion logic by: 51 1. Looking up the appropriate mapping for the Colander object type 52 2. Creating the corresponding MISP object (Attribute or Object) 53 3. Mapping fields, literals, and attributes from Colander to MISP format 54 55 Args: 56 colander_object (EntityTypes): The Colander object to convert 57 58 Returns: 59 Optional[Union[AbstractMISP, TagStub]]: The converted MISP object, or None if no mapping exists 60 """ 61 # Get the mapping configuration for this Colander object type 62 entity_type_mapping: EntityTypeMapping = self.mapping.get_mapping( 63 colander_object.get_super_type(), colander_object.type 64 ) 65 66 if entity_type_mapping is None: 67 return None 68 69 # Determine the MISP model class and type to create 70 misp_model, misp_type = entity_type_mapping.get_misp_model_class() 71 72 # Create the appropriate MISP object based on the model type 73 if issubclass(misp_model, MISPAttribute): 74 misp_object: MISPAttribute = misp_model(strict=True) 75 misp_object.type = misp_type 76 elif issubclass(misp_model, MISPObject): 77 misp_object: MISPObject = misp_model(name=misp_type, strict=True) 78 elif issubclass(misp_model, MISPTag): 79 return TagStub(entity_type_mapping.colander_misp_mapping.get("literals", {}).get("name")) 80 else: 81 return None 82 83 # Set common MISP object properties 84 # ToDo: add tag for TLP 85 misp_object.uuid = str(colander_object.id) 86 misp_object.first_seen = colander_object.created_at 87 misp_object.last_seen = colander_object.updated_at 88 89 # Convert Colander object to dictionary for nested field access 90 colander_object_dict = colander_object.model_dump(mode="json") 91 92 # Map direct field mappings from Colander to MISP object properties 93 for source_field, target_field in entity_type_mapping.get_colander_misp_field_mapping(): 94 value = getattr(colander_object, source_field, None) 95 if value is not None: 96 setattr(misp_object, target_field, value) 97 98 # Set constant/literal values on the MISP object 99 for target_field, value in entity_type_mapping.get_colander_misp_literals_mapping(): 100 if target_field in ["category", "comment"]: 101 setattr(misp_object, target_field, value) 102 else: 103 misp_object.add_attribute(target_field, value=value) 104 105 # Map Colander fields to MISP object attributes 106 for source_field, target_field in entity_type_mapping.get_colander_misp_attributes_mapping(): 107 if "." in source_field: 108 # Handle nested field access using dot notation 109 value = get_nested_value(colander_object_dict, source_field) 110 if value is not None: 111 misp_object.add_attribute(target_field, value=value) 112 else: 113 # Handle direct field access 114 value = getattr(colander_object, source_field, None) 115 if value is not None: 116 misp_object.add_attribute(target_field, value=value) 117 118 return misp_object
119
[docs] 120 @staticmethod 121 def get_element_from_event( 122 event: MISPEvent, uuid: str, types: List[str] 123 ) -> Tuple[Optional[Union[MISPObject, MISPAttribute]], Optional[str]]: 124 if "object" in types: 125 for obj in event.objects: 126 if hasattr(obj, "uuid") and obj.uuid == uuid: 127 return obj, "Object" 128 if "attribute" in types: 129 for obj in event.attributes: 130 if hasattr(obj, "uuid") and obj.uuid == uuid: 131 return obj, "Attribute" 132 return None, None
133
[docs] 134 def convert_immutable_relations(self, event: MISPEvent, colander_object: EntityTypes): 135 super_type = colander_object.super_type 136 # Create relationships based on immutable relations 137 for _, relation in colander_object.get_immutable_relations().items(): 138 reference_name = relation.name 139 relation_mapping = self.mapping.get_relation_mapping(super_type, reference_name) 140 141 if not relation_mapping: 142 continue 143 144 reverse = relation_mapping.get("reverse", False) 145 source_id = str(relation.obj_from.id) if not reverse else str(relation.obj_to.id) 146 target_id = str(relation.obj_to.id) if not reverse else str(relation.obj_from.id) 147 relation_name = relation_mapping.get("name", reference_name.replace("_", "-")) 148 149 # Tags only on MISPAttribute or MISPEvent 150 if relation_mapping.get("use_tag", False): 151 source_object, _ = self.get_element_from_event(event, source_id, types=["attribute"]) 152 if reverse: 153 tag = self.convert_colander_object(relation.obj_from) 154 else: 155 tag = self.convert_colander_object(relation.obj_to) 156 if source_object and isinstance(tag, TagStub): 157 event.add_attribute_tag(tag, source_id) 158 # Regular immutable relation between a MISPObject and another MISPObject or MISPAttribute 159 else: 160 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 161 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 162 if source_object and target_object: 163 source_object.add_relationship(type_name, target_id, relation_name)
164
[docs] 165 def convert_relations(self, event: MISPEvent, colander_relations: List[EntityRelation]): 166 for relation in colander_relations: 167 source_id = str(relation.obj_from.id) 168 target_id = str(relation.obj_to.id) 169 source_object, _ = self.get_element_from_event(event, source_id, types=["object"]) 170 target_object, type_name = self.get_element_from_event(event, target_id, types=["object", "attribute"]) 171 if source_object and target_object: 172 source_object.add_relationship(type_name, target_id, relation.name)
173
[docs] 174 def convert_case(self, case: Case, feed: ColanderFeed) -> Tuple[Optional[MISPEvent], List[EntityTypes]]: 175 skipped = [] 176 misp_event = MISPEvent() 177 misp_event.uuid = str(case.id) 178 misp_event.info = case.description 179 misp_event.date = case.created_at 180 for entity in feed.entities.values(): 181 misp_object = self.convert_colander_object(entity) 182 if not misp_object: 183 skipped.append(entity) 184 continue 185 if isinstance(misp_object, MISPAttribute): 186 misp_event.add_attribute(**misp_object.to_dict()) 187 elif isinstance(misp_object, MISPObject): 188 misp_event.add_object(misp_object) 189 190 # Immutable relations 191 for entity in feed.entities.values(): 192 self.convert_immutable_relations(misp_event, entity) 193 194 # Regular relations 195 for entity in feed.entities.values(): 196 self.convert_relations(misp_event, list(feed.get_outgoing_relations(entity).values())) 197 198 return misp_event, skipped