Source code for colander_data_converter.converters.misp.models

  1import json
  2from importlib import resources
  3from typing import Optional, Dict, Any, Type, List, Tuple, Union
  4
  5from pydantic import BaseModel, ConfigDict
  6from pymisp import MISPObject, AbstractMISP, MISPAttribute, MISPTag
  7
  8from colander_data_converter.base.models import CommonEntitySuperTypes, CommonEntitySuperType
  9from colander_data_converter.base.types.base import CommonEntityType
 10from colander_data_converter.converters.misp.utils import get_attribute_by_name
 11
 12type ColanderMISPMapping = Dict[str, Any]
 13type RelationMapping = Dict[str, Any]
 14
 15
[docs] 16class TagStub(str): 17 pass
18 19
[docs] 20class Discriminator(BaseModel): 21 type: str 22 property: str 23 target: str 24 value: Optional[str] = None
25 26
[docs] 27class MISPColanderMapping(BaseModel): 28 also: Optional[List[str]] = None 29 discriminator: Optional[Discriminator] = None
30 31
[docs] 32class EntityTypeMapping(BaseModel): 33 colander_type: str 34 misp_object: str 35 misp_type: Optional[str] = None 36 misp_definition: Optional[str] = None 37 misp_colander_mapping: MISPColanderMapping 38 colander_misp_mapping: ColanderMISPMapping 39 colander_super_type: Optional[CommonEntitySuperType] = None 40 41 @property 42 def colander_entity_type(self) -> CommonEntityType: 43 return self.colander_super_type.type_by_short_name(self.colander_type) 44
[docs] 45 def get_misp_model_class(self) -> Tuple[Type[AbstractMISP], str]: 46 if self.misp_object == "misp-attribute": 47 return MISPAttribute, self.misp_type 48 elif self.misp_object == "misp-tag": 49 return MISPTag, self.misp_type 50 return MISPObject, self.misp_object
51
[docs] 52 def get_colander_misp_field_mapping(self) -> List[Optional[Tuple[str, str]]]: 53 return [(src, dst) for src, dst in self.colander_misp_mapping.items() if isinstance(dst, str)]
54
[docs] 55 def get_colander_misp_literals_mapping(self) -> List[Optional[Tuple[str, str]]]: 56 return [(src, dst) for src, dst in self.colander_misp_mapping.get("literals", {}).items()]
57
[docs] 58 def get_colander_misp_attributes_mapping(self) -> List[Optional[Tuple[str, str]]]: 59 return [ 60 (src, dst) 61 for src, dst in self.colander_misp_mapping.get("misp_attributes", {}).items() 62 if isinstance(dst, str) 63 ]
64 65
[docs] 66class EntitySuperTypeMapping(BaseModel): 67 model_config = ConfigDict( 68 str_strip_whitespace=True, 69 arbitrary_types_allowed=True, 70 ) 71 colander_super_type: str 72 model_class: Any 73 types_mapping: Dict[str, EntityTypeMapping] = {} 74
[docs] 75 def get_supported_colander_types(self) -> List[Optional[str]]: 76 return list(self.types_mapping.keys())
77 78
[docs] 79class Mapping(object): 80 TYPES = [ 81 (CommonEntitySuperTypes.ACTOR.value, "actor"), 82 (CommonEntitySuperTypes.ARTIFACT.value, "artifact"), 83 (CommonEntitySuperTypes.DEVICE.value, "device"), 84 (CommonEntitySuperTypes.EVENT.value, "event"), 85 (CommonEntitySuperTypes.DATA_FRAGMENT.value, "data_fragment"), 86 (CommonEntitySuperTypes.DETECTION_RULE.value, "detection_rule"), 87 (CommonEntitySuperTypes.OBSERVABLE.value, "observable"), 88 (CommonEntitySuperTypes.THREAT.value, "threat"), 89 ] 90 91 def __init__(self): 92 self.misp_objects_mapping: Dict[str, List[EntityTypeMapping]] = {} 93 self.misp_attributes_mapping: Dict[str, List[EntityTypeMapping]] = {} 94 self.misp_tags_mapping: Dict[str, EntityTypeMapping] = {} 95 self.colander_super_types_mapping: Dict[str, EntitySuperTypeMapping] = {} 96 self.colander_relation_mapping = self._load_relation_mapping_definition() 97 for type_class, prefix in self.TYPES: 98 self.colander_super_types_mapping[type_class.short_name] = self._load_mapping_definition(type_class, prefix) 99 self._build_misp_mapping() 100 101 @staticmethod 102 def _load_relation_mapping_definition() -> RelationMapping: 103 resource_package = __name__ 104 json_file = resources.files(resource_package).joinpath("data").joinpath("relation_misp_mapping.json") 105 with json_file.open() as f: 106 return json.load(f) 107 108 @staticmethod 109 def _load_mapping_definition(type_class: CommonEntitySuperType, filename_prefix: str) -> EntitySuperTypeMapping: 110 resource_package = __name__ 111 json_file = resources.files(resource_package).joinpath("data").joinpath(f"{filename_prefix}_misp_mapping.json") 112 super_type_mapping = EntitySuperTypeMapping(colander_super_type=type_class.short_name, model_class=type_class) 113 with json_file.open() as f: 114 raw = json.load(f) 115 for definition in raw: 116 type_mapping = EntityTypeMapping.model_validate(definition) 117 type_mapping.colander_super_type = type_class 118 super_type_mapping.types_mapping[type_mapping.colander_type] = type_mapping 119 return super_type_mapping 120 121 def _build_misp_mapping(self): 122 for _, super_type_mapping in self.colander_super_types_mapping.items(): 123 for _, type_mapping in super_type_mapping.types_mapping.items(): 124 if type_mapping.misp_object == "misp-attribute": 125 if type_mapping.misp_type not in self.misp_attributes_mapping: 126 self.misp_attributes_mapping[type_mapping.misp_type] = [] 127 for s in type_mapping.misp_colander_mapping.also or []: 128 self.misp_attributes_mapping[s] = [type_mapping] 129 self.misp_attributes_mapping[type_mapping.misp_type].append(type_mapping) 130 if type_mapping.misp_object == "misp-tag": 131 tag_name = type_mapping.colander_misp_mapping["literals"]["name"].replace("{value}", "") 132 self.misp_tags_mapping[tag_name] = type_mapping 133 else: 134 if type_mapping.misp_object not in self.misp_objects_mapping: 135 self.misp_objects_mapping[type_mapping.misp_object] = [] 136 self.misp_objects_mapping[type_mapping.misp_object].append(type_mapping) 137
[docs] 138 def get_relation_mapping_to_misp( 139 self, super_type: CommonEntitySuperType, reference_name: str 140 ) -> Optional[RelationMapping]: 141 mapping = self.colander_relation_mapping.get(super_type.short_name, {}) 142 return mapping.get(reference_name, None)
143
[docs] 144 def get_mapping_to_misp( 145 self, entity_super_type: CommonEntitySuperType, entity_type: CommonEntityType 146 ) -> Optional[EntityTypeMapping]: 147 est_mapping = self.colander_super_types_mapping.get(entity_super_type.short_name, None) 148 if est_mapping: 149 return est_mapping.types_mapping.get(entity_type.short_name, None) 150 return None
151
[docs] 152 def get_misp_object_or_attribute_value(self) -> Optional[str]: 153 return None
154
[docs] 155 def match_discriminator(self, misp_object: Union[MISPObject, MISPAttribute], discriminator: Discriminator) -> bool: 156 matched = False 157 if not discriminator or not misp_object: 158 return matched 159 160 target_type = discriminator.target 161 if target_type == "self" and not isinstance(misp_object, MISPAttribute): 162 raise Exception("Discriminator target of type 'self' is only supported for MISPAttribute") 163 if target_type == "attribute-value" and not isinstance(misp_object, MISPObject): 164 raise Exception("Discriminator target of type 'attribute-value' is only supported for MISPObject") 165 166 value = None 167 if target_type == "self": 168 value = getattr(misp_object, discriminator.property) 169 elif target_type == "attribute-value": 170 attribute = get_attribute_by_name(misp_object, discriminator.property) 171 if attribute: 172 value = attribute.value 173 174 if discriminator.type == "match": 175 matched = value == discriminator.value 176 177 return matched
178
[docs] 179 def get_misp_object_mapping(self, misp_object: MISPObject) -> Optional[EntityTypeMapping]: 180 if not misp_object: 181 return None 182 candidates = self.misp_objects_mapping.get(misp_object.name, []) 183 if len(candidates) == 1: 184 return candidates[0] 185 for candidate in candidates: 186 types_class = candidate.colander_super_type.types_class 187 if (discriminator := candidate.misp_colander_mapping.discriminator) is None: 188 continue 189 if discriminator.type == "suggest": 190 value = None 191 if discriminator.target == "self": 192 value = getattr(misp_object, discriminator.property) 193 elif discriminator.target == "attribute-value": 194 if (attribute := get_attribute_by_name(misp_object, discriminator.property)) is None: 195 return None 196 value = attribute.value 197 if not value: 198 return None 199 if (suggested_type := types_class.suggest(value)) is None: 200 return None 201 mapping = self.get_mapping_to_misp(candidate.colander_super_type, suggested_type) 202 return mapping 203 elif self.match_discriminator(misp_object, discriminator): 204 return candidate 205 return None
206
[docs] 207 def get_misp_attribute_mapping(self, misp_attribute: MISPAttribute) -> Optional[EntityTypeMapping]: 208 if not misp_attribute: 209 return None 210 candidates = self.misp_attributes_mapping.get(misp_attribute.type, []) 211 if len(candidates) == 1: 212 return candidates[0] 213 for candidate in candidates: 214 types_class = candidate.colander_super_type.types_class 215 if (discriminator := candidate.misp_colander_mapping.discriminator) is None: 216 continue 217 if discriminator.type == "suggest": 218 if (suggested_type := types_class.suggest(getattr(misp_attribute, discriminator.property))) is None: 219 return None 220 mapping = self.get_mapping_to_misp(candidate.colander_super_type, suggested_type) 221 return mapping 222 elif self.match_discriminator(misp_attribute, discriminator): 223 return candidate 224 return None
225
[docs] 226 def get_misp_tag_mapping(self, misp_tag: MISPTag) -> Optional[EntityTypeMapping]: 227 if not misp_tag: 228 return None 229 for tag_name, mapping in self.misp_tags_mapping.items(): 230 if tag_name in misp_tag.name: 231 return mapping 232 return None