Source code for colander_data_converter.converters.stix2.mapping

  1import json
  2from importlib import resources
  3from typing import Dict, Any, List, Optional, Set, Tuple
  4
  5from colander_data_converter.base.models import Entity
  6
  7resource_package = __name__
  8
  9
[docs] 10class Stix2MappingLoader: 11 """ 12 Loads and provides access to the STIX2 to Colander mapping data. 13 """ 14
[docs] 15 def __init__(self): 16 """ 17 Initialize the mapping loader. 18 """ 19 # Load the mapping data 20 self.mapping_data = self._load_mapping_data()
21 22 @staticmethod 23 def _load_mapping_data() -> Dict[str, Any]: 24 json_file = resources.files(resource_package).joinpath("data").joinpath("stix2_colander_mapping.json") 25 try: 26 with json_file.open() as f: 27 return json.load(f) 28 except (FileNotFoundError, json.JSONDecodeError) as e: 29 raise ValueError(f"Failed to load mapping data: {e}") 30
[docs] 31 def get_entity_type_mapping(self, entity_type: str) -> Dict[str, Any]: 32 """ 33 Get the mapping data for a specific Colander entity type. 34 35 Args: 36 entity_type (str): The entity type (e.g., "actor", "device"). 37 38 Returns: 39 Dict[str, Any]: The mapping data for the entity type. 40 """ 41 _entity_type = entity_type.lower() 42 if _entity_type not in self.mapping_data: 43 return {} 44 return self.mapping_data[_entity_type]
45
[docs] 46 def get_entity_subtype_mapping(self, entity_type: str, entity_subtype: str) -> Dict[str, Any]: 47 """ 48 Get the mapping data for a specific Colander entity type. 49 50 Args: 51 entity_type (str): The entity type (e.g., "actor", "device"). 52 entity_subtype (str): The Colander entity subtype (e.g. "ipv4"). 53 54 Returns: 55 Dict[str, Any]: The mapping data for the entity type. 56 """ 57 _entity_type = entity_type.lower() 58 _entity_subtype = entity_subtype.lower() 59 if _entity_type not in self.mapping_data: 60 return {} 61 _entity_type_mapping = self.mapping_data[_entity_type] 62 if _entity_subtype not in _entity_type_mapping["types"]: 63 return {} 64 return _entity_type_mapping["types"][_entity_subtype]
65
[docs] 66 def get_stix2_type_for_entity(self, entity: Entity) -> str: 67 _entity_mapping = self.get_entity_subtype_mapping( 68 entity.get_super_type().short_name, entity.get_type().short_name 69 ) 70 return _entity_mapping.get("stix2_type", "")
71
[docs] 72 def get_supported_colander_types(self) -> List[str]: 73 return self.mapping_data.get("supported_colander_types", [])
74
[docs] 75 def get_supported_stix2_types(self) -> List[str]: 76 _types: Set[str] = set() 77 for _supported_colander_type in self.get_supported_colander_types(): 78 _type_mapping = self.mapping_data.get(_supported_colander_type, {}) 79 for _subtype_name, _mapping in _type_mapping.get("types", {}).items(): 80 _types.add(_mapping.get("stix2_type", "")) 81 return list(_types)
82
[docs] 83 def get_entity_type_for_stix2(self, stix2_type: str) -> Tuple[Optional[str], Optional[List[str]]]: 84 """ 85 Get the Colander entity type for a STIX2 type (e.g. "indicator", "threat-actor"). 86 87 Args: 88 stix2_type (str): The STIX2 type. 89 90 Returns: 91 Tuple[Optional[str], Optional[List[str]]]: The corresponding Colander type and the list of 92 subtype candidates, or None if not found. 93 """ 94 if stix2_type not in self.get_supported_stix2_types(): 95 return None, None 96 97 # Create mapping between STIX2 and Colander types (e.g. "treat-actor" -> "actor") 98 _stix2_type_mapping: Dict[str, str] = {} 99 for _supported_colander_type in self.get_supported_colander_types(): 100 for _supported_colander_subtype, _mapping in self.mapping_data[_supported_colander_type]["types"].items(): 101 _stix2_type_mapping[_mapping["stix2_type"]] = _supported_colander_type 102 if stix2_type not in _stix2_type_mapping: 103 return None, None 104 105 _colander_type_name = _stix2_type_mapping[stix2_type] # e.g. observable 106 _colander_type_mapping = self.get_entity_type_mapping(_colander_type_name) 107 108 # Iterate over Colander subtypes(e.g. ipv4, domain) 109 _subtype_candidates: Set[str] = set() 110 for _colander_subtype_name, _mapping in _colander_type_mapping.get("types", {}).items(): 111 # List subtype candidates 112 if "stix2_type" in _mapping and _mapping["stix2_type"] == stix2_type: 113 _subtype_candidates.add(_colander_subtype_name) 114 115 # If not candidates, append the "generic" subtype 116 if len(_subtype_candidates) == 0: 117 _subtype_candidates.add("generic") 118 119 return _colander_type_name, list(_subtype_candidates)
120
[docs] 121 def get_stix2_to_colander_field_mapping(self, entity_type: str) -> Dict[str, str]: 122 """ 123 Get the field mapping from STIX2 to Colander for a specific entity type. 124 125 Args: 126 entity_type (str): The entity type. 127 128 Returns: 129 Dict[str, str]: The field mapping from STIX2 to Colander. 130 """ 131 entity_mapping = self.get_entity_type_mapping(entity_type) 132 return entity_mapping.get("stix2_to_colander", {})
133
[docs] 134 def get_colander_to_stix2_field_mapping(self, entity_type: str) -> Dict[str, str]: 135 entity_mapping = self.get_entity_type_mapping(entity_type) 136 return entity_mapping.get("colander_to_stix2", {})
137
[docs] 138 def get_field_relationship_mapping(self) -> Dict[str, str]: 139 return self.mapping_data.get("field_relationship_map", {})
140
[docs] 141 def get_observable_mapping(self, observable_type: str) -> Dict[str, Any]: 142 return self.get_entity_subtype_mapping("observable", observable_type)
143
[docs] 144 def get_observable_pattern(self, observable_type: str) -> str: 145 mapping = self.get_observable_mapping(observable_type) 146 if mapping: 147 return mapping["pattern"] 148 return "[unknown:value = '{value}']"
149
[docs] 150 def get_threat_mapping(self, threat_type: str) -> Dict[str, Any]: 151 return self.get_entity_subtype_mapping("threat", threat_type)
152
[docs] 153 def get_malware_types_for_threat(self, threat_type: str) -> List[str]: 154 threat_mapping = self.get_threat_mapping(threat_type) 155 return threat_mapping.get("malware_types", [])
156
[docs] 157 def get_actor_mapping(self, actor_type: str) -> Dict[str, Any]: 158 return self.get_entity_subtype_mapping("actor", actor_type)
159
[docs] 160 def get_device_mapping(self, device_type: str) -> Dict[str, Any]: 161 return self.get_entity_subtype_mapping("device", device_type)
162
[docs] 163 def get_entity_extra_values(self, entity_type: str, entity_subtype: str) -> Dict[str, Any]: 164 mapping = self.get_entity_subtype_mapping(entity_type, entity_subtype).copy() 165 if "stix2_type" in mapping: 166 mapping.pop("stix2_type") 167 return mapping