Source code for colander_data_converter.formats.stix2.mapping

import json
from importlib import resources
from typing import Dict, Any, List, Optional, Set, Tuple


resource_package = __name__


[docs] class Stix2MappingLoader: """ Loads and provides access to the STIX2 to Colander mapping data. """ def __init__(self): """ Initialize the mapping loader. """ # Load the mapping data self.mapping_data = self._load_mapping_data() @staticmethod def _load_mapping_data() -> Dict[str, Any]: """ Load the mapping data from the JSON file. Returns: Dict[str, Any]: The mapping data. """ json_file = resources.files(resource_package).joinpath("data").joinpath("stix2_colander_mapping.json") try: with json_file.open() as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: raise ValueError(f"Failed to load mapping data: {e}")
[docs] def get_entity_type_mapping(self, entity_type: str) -> Dict[str, Any]: """ Get the mapping data for a specific Colander entity type. Args: entity_type (str): The entity type (e.g., "actor", "device"). Returns: Dict[str, Any]: The mapping data for the entity type. """ _entity_type = entity_type.lower() if _entity_type not in self.mapping_data: raise ValueError(f"Unknown entity type: {_entity_type}") return self.mapping_data[_entity_type]
[docs] def get_entity_subtype_mapping(self, entity_type: str, entity_subtype: str) -> Dict[str, Any]: """ Get the mapping data for a specific Colander entity type. Args: entity_type (str): The entity type (e.g., "actor", "device"). entity_subtype (str): The Colander entity subtype (e.g. "ipv4"). Returns: Dict[str, Any]: The mapping data for the entity type. """ _entity_type = entity_type.lower() _entity_subtype = entity_subtype.lower() if _entity_type not in self.mapping_data: raise ValueError(f"Unknown entity type: {_entity_type}") _entity_type_mapping = self.mapping_data[_entity_type] if _entity_subtype not in _entity_type_mapping["types"]: raise ValueError(f"Unknown entity type: {_entity_type}/{_entity_subtype}") return _entity_type_mapping["types"][_entity_subtype]
[docs] def get_stix2_type_for_entity(self, entity_type: str, entity_subtype: str) -> str: """ Get the STIX2 type for a Colander entity type. Args: entity_type (str): The Colander entity type (e.g., "actor", "device"). entity_subtype (str): The Colander entity subtype (e.g. "ipv4"). Returns: str: The corresponding STIX2 type. """ _entity_mapping = self.get_entity_subtype_mapping(entity_type, entity_subtype) return _entity_mapping.get("stix2_type", "")
[docs] def get_supported_colander_types(self) -> List[str]: return self.mapping_data.get("supported_colander_types", [])
[docs] def get_supported_stix2_types(self) -> List[str]: _types: Set[str] = set() for _supported_colander_type in self.get_supported_colander_types(): _type_mapping = self.mapping_data.get(_supported_colander_type, {}) for _subtype_name, _mapping in _type_mapping.get("types", {}).items(): _types.add(_mapping.get("stix2_type", "")) return list(_types)
[docs] def get_entity_type_for_stix2(self, stix2_type: str) -> Tuple[Optional[str], Optional[List[str]]]: """ Get the Colander entity type for a STIX2 type (e.g. "indicator", "threat-actor"). Args: stix2_type (str): The STIX2 type. Returns: Tuple[Optional[str], Optional[List[str]]]: The corresponding Colander type and the list of subtype candidates, or None if not found. """ if stix2_type not in self.get_supported_stix2_types(): return None, None # Create mapping between STIX2 and Colander types (e.g. "treat-actor" -> "actor") _stix2_type_mapping: Dict[str, str] = {} for _supported_colander_type in self.get_supported_colander_types(): for _supported_colander_subtype, _mapping in self.mapping_data[_supported_colander_type]["types"].items(): _stix2_type_mapping[_mapping["stix2_type"]] = _supported_colander_type if stix2_type not in _stix2_type_mapping: return None, None _colander_type_name = _stix2_type_mapping[stix2_type] # e.g. observable _colander_type_mapping = self.get_entity_type_mapping(_colander_type_name) # Iterate over Colander subtypes(e.g. ipv4, domain) _subtype_candidates: Set[str] = set() for _colander_subtype_name, _mapping in _colander_type_mapping.get("types", {}).items(): # List subtype candidates if "stix2_type" in _mapping and _mapping["stix2_type"] == stix2_type: _subtype_candidates.add(_colander_subtype_name) # If not candidates, append the "generic" subtype if len(_subtype_candidates) == 0: _subtype_candidates.add("generic") return _colander_type_name, list(_subtype_candidates)
# Handle the specific case of STIX2 indicators # if stix2_type == "indicator" and "pattern" in stix2_entity and "name" in stix2_entity: # _pattern_name = extract_stix2_pattern_name(stix2_entity["pattern"]) # for _candidate_type, _candidate_mapping in _subtype_candidates.items(): # if _pattern_name in _candidate_mapping["pattern"]: # return _colander_type_name, _candidate_type # # Return the generic subtype as it was not possible to narrow down the type selection # return "observable", "generic" # if stix2_type == "threat-actor": # return "actor", "threat_actor" # elif len(_subtype_candidates) == 1: # return _colander_type_name, list(_subtype_candidates.keys())[0] # else: # return _colander_type_name, "generic"
[docs] def get_stix2_to_colander_field_mapping(self, entity_type: str) -> Dict[str, str]: """ Get the field mapping from STIX2 to Colander for a specific entity type. Args: entity_type (str): The entity type. Returns: Dict[str, str]: The field mapping from STIX2 to Colander. """ entity_mapping = self.get_entity_type_mapping(entity_type) return entity_mapping.get("stix2_to_colander", {})
[docs] def get_colander_to_stix2_field_mapping(self, entity_type: str) -> Dict[str, str]: """ Get the field mapping from Colander to STIX2 for a specific entity type. Args: entity_type (str): The entity type. Returns: Dict[str, str]: The field mapping from Colander to STIX2. """ entity_mapping = self.get_entity_type_mapping(entity_type) return entity_mapping.get("colander_to_stix2", {})
[docs] def get_relation_mapping(self, relation_type: str) -> Dict[str, Any]: """ Get the mapping data for a specific relation type. Args: relation_type (str): The relation type (e.g., "uses", "targets"). Returns: Dict[str, Any]: The mapping data for the relation type. """ relation_types = self.mapping_data.get("relation_types", {}) if relation_type not in relation_types: raise ValueError(f"Unknown relation type: {relation_type}") return relation_types[relation_type]
[docs] def get_source_types_for_relation(self, relation_type: str) -> List[str]: """ Get the valid source entity types for a relation type. Args: relation_type (str): The relation type. Returns: List[str]: The valid source entity types. """ relation_mapping = self.get_relation_mapping(relation_type) return relation_mapping.get("source_types", [])
[docs] def get_target_types_for_relation(self, relation_type: str) -> List[str]: """ Get the valid target entity types for a relation type. Args: relation_type (str): The relation type. Returns: List[str]: The valid target entity types. """ relation_mapping = self.get_relation_mapping(relation_type) return relation_mapping.get("target_types", [])
[docs] def get_observable_pattern(self, observable_type: str) -> Dict[str, Any]: """ Get the pattern data for a specific observable type. Args: observable_type (str): The observable type (e.g., "ipv4", "domain"). Returns: Dict[str, Any]: The pattern data for the observable type. """ observable_patterns = self.mapping_data.get("observable_patterns", {}) if observable_type not in observable_patterns: raise ValueError(f"Unknown observable type: {observable_type}") return observable_patterns[observable_type]
[docs] def get_pattern_template(self, observable_type: str) -> str: """ Get the pattern template for a specific observable type. Args: observable_type (str): The observable type. Returns: str: The pattern template. """ pattern_data = self.get_observable_pattern(observable_type) return pattern_data.get("pattern_template", "")
[docs] def get_pattern_type(self, observable_type: str) -> str: """ Get the pattern type for a specific observable type. Args: observable_type (str): The observable type. Returns: str: The pattern type. """ pattern_data = self.get_observable_pattern(observable_type) return pattern_data.get("pattern_type", "")
[docs] def get_threat_mapping(self, threat_type: str) -> Dict[str, Any]: """ Get the mapping data for a specific threat type. Args: threat_type (str): The threat type (e.g., "ransomware", "trojan"). Returns: Dict[str, Any]: The mapping data for the threat type. """ threat_types = self.mapping_data.get("threat_types", {}) if threat_type not in threat_types: raise ValueError(f"Unknown threat type: {threat_type}") return threat_types[threat_type]
[docs] def get_stix2_type_for_threat(self, threat_type: str) -> str: """ Get the STIX2 type for a specific threat type. Args: threat_type (str): The threat type. Returns: str: The STIX2 type. """ threat_mapping = self.get_threat_mapping(threat_type) return threat_mapping.get("stix2_type", "")
[docs] def get_malware_types_for_threat(self, threat_type: str) -> List[str]: """ Get the malware types for a specific threat type. Args: threat_type (str): The threat type. Returns: List[str]: The malware types. """ threat_mapping = self.get_threat_mapping(threat_type) return threat_mapping.get("malware_types", [])
[docs] def get_field_relationship_type(self, field_name: str) -> str: """ Get the STIX2 relationship type for a field name. Args: field_name (str): The field name. Returns: str: The STIX2 relationship type, or "related-to" if not found. """ field_relationship_map = self.mapping_data.get("field_relationship_map", {}) return field_relationship_map.get(field_name, "related-to")