Source code for colander_data_converter.converters.misp.models

  1import json
  2from importlib import resources
  3from typing import Optional, Dict, Any, Type, List, Tuple
  4
  5from pydantic import BaseModel, ConfigDict
  6from pymisp import MISPObject, AbstractMISP, MISPAttribute, MISPTag
  7
  8from colander_data_converter.base.models import CommonEntitySuperTypes, CommonEntitySuperType
  9from colander_data_converter.base.types.base import CommonEntityType
 10
 11type MispColanderMapping = Dict[str, Any]
 12type ColanderMispMapping = Dict[str, Any]
 13type RelationMapping = Dict[str, Any]
 14
 15
[docs] 16class TagStub(str): 17 pass
18 19
[docs] 20class EntityTypeMapping(BaseModel): 21 colander_type: str 22 misp_object: str 23 misp_type: Optional[str] = None 24 misp_definition: Optional[str] = None 25 misp_colander_mapping: MispColanderMapping 26 colander_misp_mapping: ColanderMispMapping 27
[docs] 28 def get_misp_model_class(self) -> (Type[AbstractMISP], str): 29 if self.misp_object == "misp-attribute": 30 return MISPAttribute, self.misp_type 31 elif self.misp_object == "misp-tag": 32 return MISPTag, self.misp_type 33 return MISPObject, self.misp_object
34
[docs] 35 def get_colander_misp_field_mapping(self) -> List[Optional[Tuple[str, str]]]: 36 return [(src, dst) for src, dst in self.colander_misp_mapping.items() if isinstance(dst, str)]
37
[docs] 38 def get_colander_misp_literals_mapping(self) -> List[Optional[Tuple[str, str]]]: 39 return [(src, dst) for src, dst in self.colander_misp_mapping.get("literals", {}).items()]
40
[docs] 41 def get_colander_misp_attributes_mapping(self) -> List[Optional[Tuple[str, str]]]: 42 return [ 43 (src, dst) 44 for src, dst in self.colander_misp_mapping.get("misp_attributes", {}).items() 45 if isinstance(dst, str) 46 ]
47 48
[docs] 49class EntitySuperTypeMapping(BaseModel): 50 model_config = ConfigDict( 51 str_strip_whitespace=True, 52 arbitrary_types_allowed=True, 53 ) 54 colander_super_type: str 55 model_class: Any 56 types_mapping: Dict[str, EntityTypeMapping] = {} 57
[docs] 58 def get_supported_colander_types(self) -> List[Optional[str]]: 59 return list(self.types_mapping.keys())
60 61
[docs] 62class Mapping(object): 63 TYPES = [ 64 (CommonEntitySuperTypes.ACTOR.value, "actor"), 65 (CommonEntitySuperTypes.ARTIFACT.value, "artifact"), 66 (CommonEntitySuperTypes.DEVICE.value, "device"), 67 (CommonEntitySuperTypes.EVENT.value, "event"), 68 (CommonEntitySuperTypes.DATA_FRAGMENT.value, "data_fragment"), 69 (CommonEntitySuperTypes.DETECTION_RULE.value, "detection_rule"), 70 (CommonEntitySuperTypes.OBSERVABLE.value, "observable"), 71 (CommonEntitySuperTypes.THREAT.value, "threat"), 72 ] 73 74 def __init__(self): 75 self.super_types_mapping: Dict[str, EntitySuperTypeMapping] = {} 76 self.relation_mapping = self._load_relation_mapping_definition() 77 for type_class, prefix in self.TYPES: 78 self.super_types_mapping[type_class.short_name] = self._load_mapping_definition(type_class, prefix) 79 80 @staticmethod 81 def _load_relation_mapping_definition() -> RelationMapping: 82 resource_package = __name__ 83 json_file = resources.files(resource_package).joinpath("data").joinpath("relation_misp_mapping.json") 84 with json_file.open() as f: 85 return json.load(f) 86 87 @staticmethod 88 def _load_mapping_definition(type_class, filename_prefix: str) -> EntitySuperTypeMapping: 89 resource_package = __name__ 90 json_file = resources.files(resource_package).joinpath("data").joinpath(f"{filename_prefix}_misp_mapping.json") 91 super_type_mapping = EntitySuperTypeMapping(colander_super_type=type_class.short_name, model_class=type_class) 92 with json_file.open() as f: 93 raw = json.load(f) 94 for definition in raw: 95 type_mapping = EntityTypeMapping.model_validate(definition) 96 super_type_mapping.types_mapping[type_mapping.colander_type] = type_mapping 97 return super_type_mapping 98
[docs] 99 def get_relation_mapping(self, super_type: CommonEntitySuperType, reference_name: str) -> Optional[RelationMapping]: 100 mapping = self.relation_mapping.get(super_type.short_name, {}) 101 return mapping.get(reference_name, None)
102
[docs] 103 def get_mapping( 104 self, entity_super_type: CommonEntitySuperType, entity_type: CommonEntityType 105 ) -> Optional[EntityTypeMapping]: 106 est_mapping = self.super_types_mapping.get(entity_super_type.short_name, None) 107 if est_mapping: 108 return est_mapping.types_mapping.get(entity_type.short_name, None) 109 return None