1import json
2from importlib import resources
3from typing import Optional, Dict, Any, Type, List, Tuple
4
5from pydantic import BaseModel, ConfigDict
6from pymisp import MISPObject, AbstractMISP, MISPAttribute, MISPTag
7
8from colander_data_converter.base.models import CommonEntitySuperTypes, CommonEntitySuperType
9from colander_data_converter.base.types.base import CommonEntityType
10
11type MispColanderMapping = Dict[str, Any]
12type ColanderMispMapping = Dict[str, Any]
13type RelationMapping = Dict[str, Any]
14
15
18
19
[docs]
20class EntityTypeMapping(BaseModel):
21 colander_type: str
22 misp_object: str
23 misp_type: Optional[str] = None
24 misp_definition: Optional[str] = None
25 misp_colander_mapping: MispColanderMapping
26 colander_misp_mapping: ColanderMispMapping
27
[docs]
28 def get_misp_model_class(self) -> (Type[AbstractMISP], str):
29 if self.misp_object == "misp-attribute":
30 return MISPAttribute, self.misp_type
31 elif self.misp_object == "misp-tag":
32 return MISPTag, self.misp_type
33 return MISPObject, self.misp_object
34
[docs]
35 def get_colander_misp_field_mapping(self) -> List[Optional[Tuple[str, str]]]:
36 return [(src, dst) for src, dst in self.colander_misp_mapping.items() if isinstance(dst, str)]
37
[docs]
38 def get_colander_misp_literals_mapping(self) -> List[Optional[Tuple[str, str]]]:
39 return [(src, dst) for src, dst in self.colander_misp_mapping.get("literals", {}).items()]
40
[docs]
41 def get_colander_misp_attributes_mapping(self) -> List[Optional[Tuple[str, str]]]:
42 return [
43 (src, dst)
44 for src, dst in self.colander_misp_mapping.get("misp_attributes", {}).items()
45 if isinstance(dst, str)
46 ]
47
48
[docs]
49class EntitySuperTypeMapping(BaseModel):
50 model_config = ConfigDict(
51 str_strip_whitespace=True,
52 arbitrary_types_allowed=True,
53 )
54 colander_super_type: str
55 model_class: Any
56 types_mapping: Dict[str, EntityTypeMapping] = {}
57
[docs]
58 def get_supported_colander_types(self) -> List[Optional[str]]:
59 return list(self.types_mapping.keys())
60
61
[docs]
62class Mapping(object):
63 TYPES = [
64 (CommonEntitySuperTypes.ACTOR.value, "actor"),
65 (CommonEntitySuperTypes.ARTIFACT.value, "artifact"),
66 (CommonEntitySuperTypes.DEVICE.value, "device"),
67 (CommonEntitySuperTypes.EVENT.value, "event"),
68 (CommonEntitySuperTypes.DATA_FRAGMENT.value, "data_fragment"),
69 (CommonEntitySuperTypes.DETECTION_RULE.value, "detection_rule"),
70 (CommonEntitySuperTypes.OBSERVABLE.value, "observable"),
71 (CommonEntitySuperTypes.THREAT.value, "threat"),
72 ]
73
74 def __init__(self):
75 self.super_types_mapping: Dict[str, EntitySuperTypeMapping] = {}
76 self.relation_mapping = self._load_relation_mapping_definition()
77 for type_class, prefix in self.TYPES:
78 self.super_types_mapping[type_class.short_name] = self._load_mapping_definition(type_class, prefix)
79
80 @staticmethod
81 def _load_relation_mapping_definition() -> RelationMapping:
82 resource_package = __name__
83 json_file = resources.files(resource_package).joinpath("data").joinpath("relation_misp_mapping.json")
84 with json_file.open() as f:
85 return json.load(f)
86
87 @staticmethod
88 def _load_mapping_definition(type_class, filename_prefix: str) -> EntitySuperTypeMapping:
89 resource_package = __name__
90 json_file = resources.files(resource_package).joinpath("data").joinpath(f"{filename_prefix}_misp_mapping.json")
91 super_type_mapping = EntitySuperTypeMapping(colander_super_type=type_class.short_name, model_class=type_class)
92 with json_file.open() as f:
93 raw = json.load(f)
94 for definition in raw:
95 type_mapping = EntityTypeMapping.model_validate(definition)
96 super_type_mapping.types_mapping[type_mapping.colander_type] = type_mapping
97 return super_type_mapping
98
[docs]
99 def get_relation_mapping(self, super_type: CommonEntitySuperType, reference_name: str) -> Optional[RelationMapping]:
100 mapping = self.relation_mapping.get(super_type.short_name, {})
101 return mapping.get(reference_name, None)
102
[docs]
103 def get_mapping(
104 self, entity_super_type: CommonEntitySuperType, entity_type: CommonEntityType
105 ) -> Optional[EntityTypeMapping]:
106 est_mapping = self.super_types_mapping.get(entity_super_type.short_name, None)
107 if est_mapping:
108 return est_mapping.types_mapping.get(entity_type.short_name, None)
109 return None