Source code for colander_data_converter.formats.threatr.converter

from datetime import datetime, UTC
from typing import Union, List, get_args, cast, Optional
from uuid import uuid4, UUID

from pydantic import UUID4

from colander_data_converter.base.common import ObjectReference
from colander_data_converter.base.models import (
    ColanderFeed,
    EntityTypes,
    EntityRelation as ColanderEntityRelation,
    Entity as ColanderEntity,
    Event,
    CommonEntitySuperType,
    CommonEntityType,
    ColanderRepository,
    CommonEntitySuperTypes,
    Observable,
    EventTypes,
)
from colander_data_converter.base.utils import BaseModelMerger
from colander_data_converter.formats.threatr.mapping import ThreatrMapper
from colander_data_converter.formats.threatr.models import (
    ThreatrFeed,
    Entity as ThreatrEntity,
    Event as ThreatrEvent,
    EntityRelation as ThreatrEntityRelation,
)


[docs] class ColanderToThreatrMapper(ThreatrMapper): """ Mapper for converting Colander data model to Threatr data model. This class handles the conversion of Colander feeds, entities, relations, and events to their corresponding Threatr equivalents. It processes reference fields and creates appropriate relationship mappings between entities. .. note:: The mapper uses the mapping configuration loaded from the parent ThreatrMapper class to determine appropriate field and relation name mappings. """ def _get_relation_name_from_field(self, source_type: str, target_type: str, field_name: str) -> str: """ Get the relation name for a field based on the mapping configuration. :param source_type: The source entity type name :type source_type: str :param target_type: The target entity type name :type target_type: str :param field_name: The field name to map :type field_name: str :return: The mapped relation name or a default based on the field name :rtype: str .. note:: If no mapping is found in the configuration, returns a normalized version of the field name with underscores replaced by spaces. """ assert source_type is not None assert target_type is not None assert field_name is not None relation_name = field_name.lower().replace("_", " ") for mapping in self.mapping_loader.mapping_data: if ( mapping["source_type"] == source_type.lower() and mapping["target_type"] == target_type.lower() and field_name in mapping["fields"] ): relation_name = mapping["fields"][field_name] return relation_name
[docs] def convert(self, colander_feed: ColanderFeed, root_entity: Union[str, UUID, EntityTypes]) -> ThreatrFeed: """ Convert a Colander data model to a Threatr data model. This method transforms a complete Colander feed including all entities, relations, and events into the equivalent Threatr representation. It handles reference field extraction and conversion to explicit relations. :param colander_feed: The Colander feed to convert :type colander_feed: ColanderFeed :param root_entity: The root entity ID, UUID, or entity object to use as the root :type root_entity: Union[str, UUID, EntityTypes] :return: A ThreatrFeed object containing the converted data :rtype: ThreatrFeed :raises ValueError: If the root entity cannot be found or is invalid .. important:: The root entity must exist in the provided Colander feed. If a string ID is provided, it must be a valid UUID format. """ # Get the root entity object if an ID was provided root_entity_obj = None if isinstance(root_entity, str): try: root_entity = UUID(root_entity) except Exception: raise ValueError(f"Invalid UUID {root_entity}") if isinstance(root_entity, UUID): root_entity_obj = colander_feed.entities.get(str(root_entity)) if not root_entity_obj: raise ValueError(f"Root entity with ID {root_entity} not found in feed") else: root_entity_obj = root_entity # Convert the root entity to a Threatr entity threatr_root_entity = self._convert_entity(root_entity_obj) threatr_events = [] # Convert all entities threatr_entities = [threatr_root_entity] for entity_id, entity in colander_feed.entities.items(): # Skip the root entity as it's already included if str(entity.id) == str(root_entity_obj.id): continue threatr_entity = self._convert_entity(entity) if isinstance(threatr_entity, ThreatrEvent): threatr_events.append(threatr_entity) else: threatr_entities.append(threatr_entity) # Convert all relations threatr_relations = [] for relation_id, relation in colander_feed.relations.items(): threatr_relation = self._convert_relation(relation) threatr_relations.append(threatr_relation) # Convert reference fields to relations reference_relations = self._extract_reference_relations(colander_feed) threatr_relations.extend(reference_relations) # Create and return the Threatr feed return ThreatrFeed( root_entity=threatr_root_entity, entities=threatr_entities, relations=threatr_relations, events=threatr_events, )
def _convert_entity(self, entity: ColanderEntity) -> Union[ThreatrEntity, ThreatrEvent]: """ Convert a Colander entity to a Threatr entity or event. :param entity: The Colander entity to convert :type entity: ColanderEntity :return: A Threatr entity or event based on the input type :rtype: Union[ThreatrEntity, ThreatrEvent] .. note:: Events are detected by checking if the entity is an instance of the Event class and are converted to ThreatrEvent objects accordingly. """ # Create a base entity with common fields model_class = ThreatrEntity if isinstance(entity, Event): model_class = ThreatrEvent threatr_entity = model_class( id=entity.id, created_at=getattr(entity, "created_at", datetime.now(UTC)), updated_at=getattr(entity, "updated_at", datetime.now(UTC)), name=entity.name, type=cast(CommonEntityType, entity.type), super_type=cast(CommonEntitySuperType, entity.super_type), attributes={}, ) bm = BaseModelMerger() bm.merge(entity, threatr_entity) return threatr_entity def _convert_relation(self, relation: ColanderEntityRelation) -> ThreatrEntityRelation: """ Convert a Colander entity relation to a Threatr entity relation. :param relation: The Colander entity relation to convert :type relation: ColanderEntityRelation :return: A Threatr entity relation :rtype: ThreatrEntityRelation .. note:: Object references are normalized to UUIDs during conversion to maintain consistency in the Threatr model. """ # Create a base relation with common fields threatr_relation = ThreatrEntityRelation( id=relation.id, created_at=getattr(relation, "created_at", datetime.now(UTC)), updated_at=getattr(relation, "updated_at", datetime.now(UTC)), name=relation.name, description=getattr(relation, "description", None), obj_from=relation.obj_from if isinstance(relation.obj_from, UUID) else relation.obj_from.id, obj_to=relation.obj_to if isinstance(relation.obj_to, UUID) else relation.obj_to.id, attributes={}, ) bm = BaseModelMerger() bm.merge(relation, threatr_relation) return threatr_relation def _extract_reference_relations(self, colander_feed: ColanderFeed) -> List[ThreatrEntityRelation]: """ Extract reference fields from Colander entities and convert them to Threatr relations. This method processes all entities in the feed to identify ObjectReference fields and converts them into explicit EntityRelation objects in the Threatr model. :param colander_feed: The Colander feed containing entities :type colander_feed: ColanderFeed :return: A list of Threatr entity relations extracted from reference fields :rtype: List[ThreatrEntityRelation] .. note:: Both single ObjectReference fields and List[ObjectReference] fields are processed to create appropriate relationship mappings. """ relations = [] for entity_id, entity in colander_feed.entities.items(): entity_type_name = type(entity).__name__.lower() for field_name, field_info in entity.__class__.model_fields.items(): field_annotation = get_args(field_info.annotation) field_value = getattr(entity, field_name, None) if not field_value or not field_annotation: continue # Handle single ObjectReference if ObjectReference in field_annotation: relation = self._create_relation_from_reference( entity, field_name, field_value, entity_type_name, colander_feed, is_list=False ) if relation: relations.append(relation) # Handle List[ObjectReference] elif List[ObjectReference] in field_annotation: for object_reference in field_value: relation = self._create_relation_from_reference( entity, field_name, object_reference, entity_type_name, colander_feed, is_list=True ) if relation: relations.append(relation) return relations def _create_relation_from_reference( self, entity, field_name, reference_value, entity_type_name, colander_feed, is_list=False ): """ Helper method to create a relation from a reference field. :param entity: The source entity containing the reference :param field_name: The name of the reference field :type field_name: str :param reference_value: The reference value (UUID or object) :param entity_type_name: The source entity type name :type entity_type_name: str :param colander_feed: The feed containing target entities :type colander_feed: ColanderFeed :param is_list: Whether the reference comes from a list field :type is_list: bool :return: A new ThreatrEntityRelation or None if target not found :rtype: ThreatrEntityRelation | None """ target_id = reference_value if isinstance(reference_value, UUID) else reference_value.id target_entity = colander_feed.entities.get(str(target_id)) if not target_entity: return None target_entity_type_name = type(target_entity).__name__.lower() # Get relation name based on whether it's a list or single reference if is_list: relation_name = self._get_relation_name_from_field(entity_type_name, target_entity_type_name, field_name) else: relation_name = field_name.replace("_", " ") return ThreatrEntityRelation( id=uuid4(), created_at=datetime.now(UTC), updated_at=datetime.now(UTC), name=relation_name, description=f"Relation extracted from {entity_type_name}.{field_name} reference to {target_entity_type_name}", obj_from=entity.id, obj_to=target_entity.id, attributes={}, )
[docs] class ThreatrToColanderMapper(ThreatrMapper): """ Mapper for converting Threatr data model to Colander data model. This class handles the conversion of Threatr feeds, entities, events, and relations to their corresponding Colander equivalents. It processes explicit relations and attempts to convert them back to reference fields where appropriate. .. important:: This mapper maintains state during conversion, storing the input ThreatrFeed and building the output ColanderFeed incrementally. :ivar threatr_feed: The input Threatr feed being converted :type threatr_feed: Optional[ThreatrFeed] :ivar colander_feed: The output Colander feed being built :type colander_feed: ColanderFeed """ def __init__(self): """ Initialize the mapper with empty feed containers. .. note:: The mapper creates a new ColanderFeed instance for each conversion process. """ super().__init__() self.threatr_feed: Optional[ThreatrFeed] = None self.colander_feed: ColanderFeed = ColanderFeed() def _get_field_from_relation_name(self, source_type: str, target_type: str, relation_name: str) -> Optional[str]: """ Get the field name for a relation based on the mapping configuration. :param source_type: The source entity type name :type source_type: str :param target_type: The target entity type name :type target_type: str :param relation_name: The relation name to reverse-map :type relation_name: str :return: The corresponding field name or None if no mapping found :rtype: Optional[str] .. note:: This method performs reverse lookup in the mapping configuration to find field names that correspond to relation names. """ assert source_type is not None assert target_type is not None assert relation_name is not None relation_name = relation_name.lower().replace("_", " ") for mapping in self.mapping_loader.mapping_data: if mapping["source_type"] == source_type.lower() and mapping["target_type"] == target_type.lower(): for fn, rn in mapping["fields"].items(): if rn == relation_name: return fn return None def _create_immutable_relation(self, threatr_relation: ThreatrEntityRelation) -> bool: """ Attempt to convert a Threatr relation back to a reference field. This method tries to convert explicit relations back into reference fields on the source entity, which is the preferred representation in the Colander model. :param threatr_relation: The Threatr relation to convert :type threatr_relation: ThreatrEntityRelation :return: True if the relation was successfully converted to a reference field :rtype: bool .. important:: Only relations that map to known reference fields can be converted. Other relations remain as explicit EntityRelation objects. """ relation_name = threatr_relation.name source_entity_id: UUID4 = ( threatr_relation.obj_from if isinstance(threatr_relation.obj_from, UUID) else threatr_relation.obj_from.id ) target_entity_id: UUID4 = ( threatr_relation.obj_to if isinstance(threatr_relation.obj_to, UUID) else threatr_relation.obj_to.id ) source_entity = ColanderRepository() >> source_entity_id target_entity = ColanderRepository() >> target_entity_id # Ensure both source and target entities are valid Colander entities if not isinstance(source_entity, ColanderEntity) or not isinstance(target_entity, ColanderEntity): return False if ( field_name := self._get_field_from_relation_name( source_entity.super_type.short_name, target_entity.super_type.short_name, relation_name ) ) is not None and field_name in source_entity.__class__.model_fields.keys(): setattr(source_entity, field_name, target_entity) return True return False def _convert_relation(self, relation: ThreatrEntityRelation) -> Optional[ColanderEntityRelation]: """ Convert a Threatr entity relation to a Colander entity relation. :param relation: The Threatr entity relation to convert :type relation: ThreatrEntityRelation :return: A Colander entity relation or None if conversion fails :rtype: Optional[ColanderEntityRelation] :raises AssertionError: If relation or its object references are None .. note:: This method checks if the relation has already been converted to avoid duplicate processing. """ assert relation is not None assert relation.obj_from is not None assert relation.obj_to is not None colander_relation: ColanderEntityRelation = ColanderRepository() >> relation.id if isinstance(colander_relation, ColanderEntityRelation): return colander_relation obj_from = ColanderRepository() >> relation.obj_from.id obj_to = ColanderRepository() >> relation.obj_to.id if obj_from and obj_to: colander_relation = ColanderEntityRelation( id=relation.id, name=relation.name, created_at=relation.created_at, updated_at=relation.updated_at, obj_from=obj_from, obj_to=obj_to, ) bm = BaseModelMerger() bm.merge(relation, colander_relation) return colander_relation return None def _convert_entity(self, entity: ThreatrEntity) -> Optional[EntityTypes]: """ Convert a Threatr entity to a Colander entity. This method determines the appropriate Colander entity type based on the Threatr entity's super_type and type, then creates the corresponding instance. :param entity: The Threatr entity to convert :type entity: ThreatrEntity :return: A Colander entity or None if conversion is not supported :rtype: Optional[EntityTypes] :raises AssertionError: If entity is None or not a ThreatrEntity instance .. note:: Entities that have already been processed are returned from the repository without re-conversion to maintain object identity. """ assert entity is not None assert isinstance(entity, ThreatrEntity) # The entity has already been processed if (colander_entity := ColanderRepository() >> entity.id) is not None and isinstance( colander_entity, ColanderEntity ): return colander_entity # The super type is not supported if (super_type := CommonEntitySuperTypes.by_short_name(short_name=entity.super_type.short_name)) is None: return None # The entity type is not supported if (sub_type := super_type.type_by_short_name(entity.type.short_name)) is None: return None colander_entity = super_type.model_class( id=entity.id, name=entity.name, created_at=entity.created_at, updated_at=entity.updated_at, type=sub_type, ) bm = BaseModelMerger() bm.merge(entity, colander_entity, ignored_fields=["super_type"]) return colander_entity def _convert_event(self, event: ThreatrEvent) -> Event: """ Convert a Threatr event to a Colander event. :param event: The Threatr event to convert :type event: ThreatrEvent :return: A Colander event :rtype: Event :raises AssertionError: If event is None or not a ThreatrEvent instance .. note:: Events that have already been processed are returned from the repository without re-conversion. Involved entities are automatically linked if they are Observable instances. """ assert event is not None assert isinstance(event, ThreatrEvent) # The event has already been processed if (colander_event := ColanderRepository() >> event.id) is not None and isinstance(colander_event, Event): return colander_event sub_type = EventTypes.by_short_name(event.type.short_name) colander_event = Event( id=event.id, name=event.name, created_at=event.created_at, updated_at=event.updated_at, first_seen=event.first_seen, last_seen=event.last_seen, count=event.count, type=sub_type, ) if (involved_entity := event.involved_entity) is not None: involved_entity = ColanderRepository() >> involved_entity.id if isinstance(involved_entity, Observable): colander_event.involved_observables.append(involved_entity) bm = BaseModelMerger() bm.merge(event, colander_event, ignored_fields=["involved_entity", "super_type"]) return colander_event
[docs] def convert(self, threatr_feed: ThreatrFeed) -> ColanderFeed: """ Convert a Threatr data model to a Colander data model. This method performs a complete conversion of a ThreatrFeed to a ColanderFeed, handling entities, events, and relations. It attempts to convert explicit relations back to reference fields where possible. :param threatr_feed: The Threatr feed to convert :type threatr_feed: ThreatrFeed :return: A ColanderFeed object containing the converted data :rtype: ColanderFeed :raises AssertionError: If threatr_feed is None or not a ThreatrFeed instance .. important:: The method resolves all references in the input feed before processing to ensure consistent object relationships. """ assert threatr_feed is not None assert isinstance(threatr_feed, ThreatrFeed) self.threatr_feed = threatr_feed self.threatr_feed.resolve_references() self.colander_feed.description = "Feed automatically generated from a Threatr feed." if (root_entity := threatr_feed.root_entity) is not None: if (colander_entity := self._convert_entity(root_entity)) is not None: self.colander_feed.entities[str(root_entity.id)] = colander_entity for entity in threatr_feed.entities or []: if (colander_entity := self._convert_entity(entity)) is not None: self.colander_feed.entities[str(entity.id)] = colander_entity for event in threatr_feed.events or []: if (colander_event := self._convert_event(event)) is not None: self.colander_feed.entities[str(event.id)] = colander_event for relation in threatr_feed.relations or []: if not self._create_immutable_relation(relation): if (colander_relation := self._convert_relation(relation)) is not None: self.colander_feed.relations[str(relation.id)] = colander_relation return self.colander_feed