Source code for colander_data_converter.converters.threatr.converter

  1from datetime import datetime, UTC
  2from typing import Union, List, get_args, cast, Optional
  3from uuid import uuid4, UUID
  4
  5from pydantic import UUID4
  6
  7from colander_data_converter.base.common import ObjectReference
  8from colander_data_converter.base.models import (
  9    ColanderFeed,
 10    EntityTypes,
 11    EntityRelation as ColanderEntityRelation,
 12    Entity as ColanderEntity,
 13    Event,
 14    CommonEntitySuperType,
 15    ColanderRepository,
 16    CommonEntitySuperTypes,
 17    Observable,
 18)
 19from colander_data_converter.base.types.base import CommonEntityType
 20from colander_data_converter.base.types.event import EventTypes
 21from colander_data_converter.base.utils import BaseModelMerger
 22from colander_data_converter.converters.threatr.mapping import ThreatrMapper
 23from colander_data_converter.converters.threatr.models import (
 24    ThreatrFeed,
 25    Entity as ThreatrEntity,
 26    Event as ThreatrEvent,
 27    EntityRelation as ThreatrEntityRelation,
 28)
 29
 30
[docs] 31class ColanderToThreatrMapper(ThreatrMapper): 32 """ 33 Mapper for converting Colander data model to Threatr data model. 34 35 This class handles the conversion of Colander feeds, entities, relations, and events 36 to their corresponding Threatr equivalents. It processes reference fields and creates 37 appropriate relationship mappings between entities. 38 39 Note: 40 The mapper uses the mapping configuration loaded from the parent ThreatrMapper 41 class to determine appropriate field and relation name mappings. 42 """ 43 44 def _get_relation_name_from_field(self, source_type: str, target_type: str, field_name: str) -> str: 45 """ 46 Get the relation name for a field based on the mapping configuration. 47 48 Args: 49 source_type (str): The source entity type name 50 target_type (str): The target entity type name 51 field_name (str): The field name to map 52 53 Returns: 54 str: The mapped relation name or a default based on the field name 55 56 Note: 57 If no mapping is found in the configuration, returns a normalized 58 version of the field name with underscores replaced by spaces. 59 """ 60 assert source_type is not None 61 assert target_type is not None 62 assert field_name is not None 63 64 relation_name = field_name.lower().replace("_", " ") 65 for mapping in self.mapping_loader.mapping_data: 66 if ( 67 mapping["source_type"] == source_type.lower() 68 and mapping["target_type"] == target_type.lower() 69 and field_name in mapping["fields"] 70 ): 71 relation_name = mapping["fields"][field_name] 72 73 return relation_name 74
[docs] 75 def convert(self, colander_feed: ColanderFeed, root_entity: Union[str, UUID4, EntityTypes]) -> ThreatrFeed: 76 """ 77 Convert a Colander data model to a Threatr data model. 78 79 This method transforms a complete Colander feed including all entities, relations, 80 and events into the equivalent Threatr representation. It handles reference field 81 extraction and conversion to explicit relations. 82 83 Args: 84 colander_feed (ColanderFeed): The Colander feed to convert 85 root_entity (Union[str, UUID4, EntityTypes]): The root entity ID, UUID, or entity object to use as the root 86 87 Returns: 88 ThreatrFeed: A ThreatrFeed object containing the converted data 89 90 Raises: 91 ValueError: If the root entity cannot be found or is invalid 92 93 Important: 94 The root entity must exist in the provided Colander feed. If a string ID 95 is provided, it must be a valid UUID format. 96 """ 97 # Get the root entity object if an ID was provided 98 root_entity_obj = None 99 if isinstance(root_entity, str): 100 try: 101 root_entity = UUID(root_entity, version=4) 102 except Exception: 103 raise ValueError(f"Invalid UUID {root_entity}") 104 if isinstance(root_entity, UUID): 105 root_entity_obj = colander_feed.entities.get(str(root_entity)) 106 if not root_entity_obj: 107 raise ValueError(f"Root entity with ID {root_entity} not found in feed") 108 else: 109 root_entity_obj = root_entity 110 111 # Convert the root entity to a Threatr entity 112 threatr_root_entity = self._convert_entity(root_entity_obj) 113 threatr_events = [] 114 115 # Convert all entities 116 threatr_entities = [threatr_root_entity] 117 for entity_id, entity in colander_feed.entities.items(): 118 # Skip the root entity as it's already included 119 if str(entity.id) == str(root_entity_obj.id): 120 continue 121 threatr_entity = self._convert_entity(entity) 122 if isinstance(threatr_entity, ThreatrEvent): 123 threatr_events.append(threatr_entity) 124 else: 125 threatr_entities.append(threatr_entity) 126 127 # Convert all relations 128 threatr_relations = [] 129 for relation_id, relation in colander_feed.relations.items(): 130 threatr_relation = self._convert_relation(relation) 131 threatr_relations.append(threatr_relation) 132 133 # Convert reference fields to relations 134 reference_relations = self._extract_reference_relations(colander_feed) 135 threatr_relations.extend(reference_relations) 136 137 # Create and return the Threatr feed 138 return ThreatrFeed( 139 root_entity=threatr_root_entity, 140 entities=threatr_entities, 141 relations=threatr_relations, 142 events=threatr_events, 143 )
144 145 def _convert_entity(self, entity: ColanderEntity) -> Union[ThreatrEntity, ThreatrEvent]: 146 """ 147 Convert a Colander entity to a Threatr entity or event. 148 149 Args: 150 entity (ColanderEntity): The Colander entity to convert 151 152 Returns: 153 Union[ThreatrEntity, ThreatrEvent]: A Threatr entity or event based on the input type 154 155 Note: 156 Events are detected by checking if the entity is an instance of the Event class 157 and are converted to ThreatrEvent objects accordingly. 158 """ 159 # Create a base entity with common fields 160 model_class = ThreatrEntity 161 if isinstance(entity, Event): 162 model_class = ThreatrEvent 163 threatr_entity = model_class( 164 id=entity.id, 165 created_at=getattr(entity, "created_at", datetime.now(UTC)), 166 updated_at=getattr(entity, "updated_at", datetime.now(UTC)), 167 name=entity.name, 168 type=cast(CommonEntityType, entity.type), 169 super_type=cast(CommonEntitySuperType, entity.super_type), 170 attributes={}, 171 ) 172 173 bm = BaseModelMerger() 174 bm.merge(entity, threatr_entity) 175 176 return threatr_entity 177 178 def _convert_relation(self, relation: ColanderEntityRelation) -> ThreatrEntityRelation: 179 """ 180 Convert a Colander entity relation to a Threatr entity relation. 181 182 Args: 183 relation (ColanderEntityRelation): The Colander entity relation to convert 184 185 Returns: 186 ThreatrEntityRelation: A Threatr entity relation 187 188 Note: 189 Object references are normalized to UUIDs during conversion to maintain 190 consistency in the Threatr model. 191 """ 192 # Create a base relation with common fields 193 threatr_relation = ThreatrEntityRelation( 194 id=relation.id, 195 created_at=getattr(relation, "created_at", datetime.now(UTC)), 196 updated_at=getattr(relation, "updated_at", datetime.now(UTC)), 197 name=relation.name, 198 description=getattr(relation, "description", None), 199 obj_from=relation.obj_from if isinstance(relation.obj_from, UUID) else relation.obj_from.id, 200 obj_to=relation.obj_to if isinstance(relation.obj_to, UUID) else relation.obj_to.id, 201 attributes={}, 202 ) 203 204 bm = BaseModelMerger() 205 bm.merge(relation, threatr_relation) 206 207 return threatr_relation 208 209 def _extract_reference_relations(self, colander_feed: ColanderFeed) -> List[ThreatrEntityRelation]: 210 """ 211 Extract reference fields from Colander entities and convert them to Threatr relations. 212 213 This method processes all entities in the feed to identify ObjectReference fields 214 and converts them into explicit EntityRelation objects in the Threatr model. 215 216 Args: 217 colander_feed (ColanderFeed): The Colander feed containing entities 218 219 Returns: 220 List[ThreatrEntityRelation]: A list of Threatr entity relations extracted from reference fields 221 222 Note: 223 Both single ObjectReference fields and List[ObjectReference] fields are processed 224 to create appropriate relationship mappings. 225 """ 226 relations = [] 227 228 for entity_id, entity in colander_feed.entities.items(): 229 entity_type_name = type(entity).__name__.lower() 230 231 for field_name, field_info in entity.__class__.model_fields.items(): 232 field_annotation = get_args(field_info.annotation) 233 field_value = getattr(entity, field_name, None) 234 235 if not field_value or not field_annotation: 236 continue 237 238 # Handle single ObjectReference 239 if ObjectReference in field_annotation: 240 relation = self._create_relation_from_reference( 241 entity, field_name, field_value, entity_type_name, colander_feed, is_list=False 242 ) 243 if relation: 244 relations.append(relation) 245 246 # Handle List[ObjectReference] 247 elif List[ObjectReference] in field_annotation: 248 for object_reference in field_value: 249 relation = self._create_relation_from_reference( 250 entity, field_name, object_reference, entity_type_name, colander_feed, is_list=True 251 ) 252 if relation: 253 relations.append(relation) 254 255 return relations 256 257 def _create_relation_from_reference( 258 self, entity, field_name, reference_value, entity_type_name, colander_feed, is_list=False 259 ): 260 """ 261 Helper method to create a relation from a reference field. 262 263 Args: 264 entity: The source entity containing the reference 265 field_name (str): The name of the reference field 266 reference_value: The reference value (UUID or object) 267 entity_type_name (str): The source entity type name 268 colander_feed (ColanderFeed): The feed containing target entities 269 is_list (bool, optional): Whether the reference comes from a list field. Defaults to False. 270 271 Returns: 272 ThreatrEntityRelation | None: A new ThreatrEntityRelation or None if target not found 273 """ 274 target_id = reference_value if isinstance(reference_value, UUID) else reference_value.id 275 target_entity = colander_feed.entities.get(str(target_id)) 276 277 if not target_entity: 278 return None 279 280 target_entity_type_name = type(target_entity).__name__.lower() 281 282 # Get relation name based on whether it's a list or single reference 283 if is_list: 284 relation_name = self._get_relation_name_from_field(entity_type_name, target_entity_type_name, field_name) 285 else: 286 relation_name = field_name.replace("_", " ") 287 288 return ThreatrEntityRelation( 289 id=uuid4(), 290 created_at=datetime.now(UTC), 291 updated_at=datetime.now(UTC), 292 name=relation_name, 293 description=f"Relation extracted from {entity_type_name}.{field_name} reference to {target_entity_type_name}", 294 obj_from=entity.id, 295 obj_to=target_entity.id, 296 attributes={}, 297 )
298 299
[docs] 300class ThreatrToColanderMapper(ThreatrMapper): 301 """ 302 Mapper for converting Threatr data model to Colander data model. 303 304 This class handles the conversion of Threatr feeds, entities, events, and relations 305 to their corresponding Colander equivalents. It processes explicit relations and 306 attempts to convert them back to reference fields where appropriate. 307 308 Important: 309 This mapper maintains state during conversion, storing the input ThreatrFeed 310 and building the output ColanderFeed incrementally. 311 312 Attributes: 313 threatr_feed (Optional[ThreatrFeed]): The input Threatr feed being converted 314 colander_feed (ColanderFeed): The output Colander feed being built 315 """ 316
[docs] 317 def __init__(self): 318 """ 319 Initialize the mapper with empty feed containers. 320 321 Note: 322 The mapper creates a new ColanderFeed instance for each conversion process. 323 """ 324 super().__init__() 325 self.threatr_feed: Optional[ThreatrFeed] = None 326 self.colander_feed: ColanderFeed = ColanderFeed()
327 328 def _get_field_from_relation_name(self, source_type: str, target_type: str, relation_name: str) -> Optional[str]: 329 """ 330 Get the field name for a relation based on the mapping configuration. 331 332 Args: 333 source_type (str): The source entity type name 334 target_type (str): The target entity type name 335 relation_name (str): The relation name to reverse-map 336 337 Returns: 338 Optional[str]: The corresponding field name or None if no mapping found 339 340 Note: 341 This method performs reverse lookup in the mapping configuration 342 to find field names that correspond to relation names. 343 """ 344 assert source_type is not None 345 assert target_type is not None 346 assert relation_name is not None 347 relation_name = relation_name.lower().replace("_", " ") 348 for mapping in self.mapping_loader.mapping_data: 349 if mapping["source_type"] == source_type.lower() and mapping["target_type"] == target_type.lower(): 350 for fn, rn in mapping["fields"].items(): 351 if rn == relation_name: 352 return fn 353 return None 354 355 def _create_immutable_relation(self, threatr_relation: ThreatrEntityRelation) -> bool: 356 """ 357 Attempt to convert a Threatr relation back to a reference field. 358 359 This method tries to convert explicit relations back into reference fields 360 on the source entity, which is the preferred representation in the Colander model. 361 362 Args: 363 threatr_relation (ThreatrEntityRelation): The Threatr relation to convert 364 365 Returns: 366 bool: True if the relation was successfully converted to a reference field 367 368 Important: 369 Only relations that map to known reference fields can be converted. 370 Other relations remain as explicit EntityRelation objects. 371 """ 372 relation_name = threatr_relation.name 373 source_entity_id: UUID4 = ( 374 threatr_relation.obj_from if isinstance(threatr_relation.obj_from, UUID) else threatr_relation.obj_from.id 375 ) 376 target_entity_id: UUID4 = ( 377 threatr_relation.obj_to if isinstance(threatr_relation.obj_to, UUID) else threatr_relation.obj_to.id 378 ) 379 380 source_entity = ColanderRepository() >> source_entity_id 381 target_entity = ColanderRepository() >> target_entity_id 382 383 # Ensure both source and target entities are valid Colander entities 384 if not isinstance(source_entity, ColanderEntity) or not isinstance(target_entity, ColanderEntity): 385 return False 386 387 if ( 388 field_name := self._get_field_from_relation_name( 389 source_entity.super_type.short_name, target_entity.super_type.short_name, relation_name 390 ) 391 ) is not None and field_name in source_entity.__class__.model_fields.keys(): 392 setattr(source_entity, field_name, target_entity) 393 return True 394 395 return False 396 397 def _convert_relation(self, relation: ThreatrEntityRelation) -> Optional[ColanderEntityRelation]: 398 """ 399 Convert a Threatr entity relation to a Colander entity relation. 400 401 Args: 402 relation (ThreatrEntityRelation): The Threatr entity relation to convert 403 404 Returns: 405 Optional[ColanderEntityRelation]: A Colander entity relation or None if conversion fails 406 407 Raises: 408 AssertionError: If relation or its object references are None 409 410 Note: 411 This method checks if the relation has already been converted to avoid 412 duplicate processing. 413 """ 414 assert relation is not None 415 assert relation.obj_from is not None 416 assert relation.obj_to is not None 417 418 colander_relation: ColanderEntityRelation = ColanderRepository() >> relation.id 419 if isinstance(colander_relation, ColanderEntityRelation): 420 return colander_relation 421 422 obj_from = ColanderRepository() >> relation.obj_from.id 423 obj_to = ColanderRepository() >> relation.obj_to.id 424 if obj_from and obj_to: 425 colander_relation = ColanderEntityRelation( 426 id=relation.id, 427 name=relation.name, 428 created_at=relation.created_at, 429 updated_at=relation.updated_at, 430 obj_from=obj_from, 431 obj_to=obj_to, 432 ) 433 bm = BaseModelMerger() 434 bm.merge(relation, colander_relation) 435 return colander_relation 436 437 return None 438 439 def _convert_entity(self, entity: ThreatrEntity) -> Optional[EntityTypes]: 440 """ 441 Convert a Threatr entity to a Colander entity. 442 443 This method determines the appropriate Colander entity type based on the 444 Threatr entity's super_type and type, then creates the corresponding instance. 445 446 Args: 447 entity (ThreatrEntity): The Threatr entity to convert 448 449 Returns: 450 Optional[EntityTypes]: A Colander entity or None if conversion is not supported 451 452 Raises: 453 AssertionError: If entity is None or not a ThreatrEntity instance 454 455 Note: 456 Entities that have already been processed are returned from the repository 457 without re-conversion to maintain object identity. 458 """ 459 assert entity is not None 460 assert isinstance(entity, ThreatrEntity) 461 462 # The entity has already been processed 463 if (colander_entity := ColanderRepository() >> entity.id) is not None and isinstance( 464 colander_entity, ColanderEntity 465 ): 466 return colander_entity 467 468 # The super type is not supported 469 if (super_type := CommonEntitySuperTypes.by_short_name(short_name=entity.super_type.short_name)) is None: 470 return None 471 472 # The entity type is not supported 473 if (sub_type := super_type.type_by_short_name(entity.type.short_name)) is None: 474 return None 475 476 colander_entity = super_type.model_class( 477 id=entity.id, 478 name=entity.name, 479 created_at=entity.created_at, 480 updated_at=entity.updated_at, 481 type=sub_type, 482 ) 483 bm = BaseModelMerger() 484 bm.merge(entity, colander_entity, ignored_fields=["super_type"]) 485 return colander_entity 486 487 def _convert_event(self, event: ThreatrEvent) -> Event: 488 """ 489 Convert a Threatr event to a Colander event. 490 491 Args: 492 event (ThreatrEvent): The Threatr event to convert 493 494 Returns: 495 Event: A Colander event 496 497 Raises: 498 AssertionError: If event is None or not a ThreatrEvent instance 499 500 Note: 501 Events that have already been processed are returned from the repository 502 without re-conversion. Involved entities are automatically linked if they 503 are Observable instances. 504 """ 505 assert event is not None 506 assert isinstance(event, ThreatrEvent) 507 508 # The event has already been processed 509 if (colander_event := ColanderRepository() >> event.id) is not None and isinstance(colander_event, Event): 510 return colander_event 511 512 sub_type = EventTypes.by_short_name(event.type.short_name) 513 514 colander_event = Event( 515 id=event.id, 516 name=event.name, 517 created_at=event.created_at, 518 updated_at=event.updated_at, 519 first_seen=event.first_seen, 520 last_seen=event.last_seen, 521 count=event.count, 522 type=sub_type, 523 ) 524 525 if (involved_entity := event.involved_entity) is not None: 526 involved_entity = ColanderRepository() >> involved_entity.id 527 if isinstance(involved_entity, Observable): 528 colander_event.involved_observables.append(involved_entity) 529 530 bm = BaseModelMerger() 531 bm.merge(event, colander_event, ignored_fields=["involved_entity", "super_type"]) 532 return colander_event 533
[docs] 534 def convert(self, threatr_feed: ThreatrFeed) -> ColanderFeed: 535 """ 536 Convert a Threatr data model to a Colander data model. 537 538 This method performs a complete conversion of a ThreatrFeed to a ColanderFeed, 539 handling entities, events, and relations. It attempts to convert explicit 540 relations back to reference fields where possible. 541 542 Args: 543 threatr_feed (ThreatrFeed): The Threatr feed to convert 544 545 Returns: 546 ColanderFeed: A ColanderFeed object containing the converted data 547 548 Raises: 549 AssertionError: If threatr_feed is None or not a ThreatrFeed instance 550 551 Important: 552 The method resolves all references in the input feed before processing 553 to ensure consistent object relationships. 554 """ 555 assert threatr_feed is not None 556 assert isinstance(threatr_feed, ThreatrFeed) 557 self.threatr_feed = threatr_feed 558 self.threatr_feed.resolve_references() 559 self.colander_feed.description = "Feed automatically generated from a Threatr feed." 560 561 if (root_entity := threatr_feed.root_entity) is not None: 562 if (colander_entity := self._convert_entity(root_entity)) is not None: 563 self.colander_feed.entities[str(root_entity.id)] = colander_entity 564 565 for entity in threatr_feed.entities or []: 566 if (colander_entity := self._convert_entity(entity)) is not None: 567 self.colander_feed.entities[str(entity.id)] = colander_entity 568 569 for event in threatr_feed.events or []: 570 if (colander_event := self._convert_event(event)) is not None: 571 self.colander_feed.entities[str(event.id)] = colander_event 572 573 for relation in threatr_feed.relations or []: 574 if not self._create_immutable_relation(relation): 575 if (colander_relation := self._convert_relation(relation)) is not None: 576 self.colander_feed.relations[str(relation.id)] = colander_relation 577 578 return self.colander_feed
579 580
[docs] 581class ThreatrConverter: 582 """ 583 Converter for Threatr data to Colander data and vice versa. 584 Uses the mapping file to convert between formats. 585 """ 586
[docs] 587 @staticmethod 588 def threatr_to_colander(threatr_feed: ThreatrFeed) -> ColanderFeed: 589 """ 590 Converts Threatr data to Colander data using the mapping file. 591 592 Args: 593 threatr_feed (ThreatrFeed): The Threatr data to convert. 594 595 Returns: 596 ColanderFeed: The converted Colander data. 597 """ 598 mapper = ThreatrToColanderMapper() 599 return mapper.convert(threatr_feed)
600
[docs] 601 @staticmethod 602 def colander_to_threatr(colander_feed: ColanderFeed, root_entity: Union[str, UUID4, EntityTypes]) -> ThreatrFeed: 603 """ 604 Converts Colander data to Threatr data using the mapping file. 605 606 Args: 607 colander_feed (ColanderFeed): The Colander data to convert. 608 root_entity (Union[str, UUID4, EntityTypes]): The root entity ID, UUID, or entity object to use as the root 609 610 Returns: 611 ThreatrFeed: The converted Threatr data. 612 """ 613 mapper = ColanderToThreatrMapper() 614 colander_feed.resolve_references() 615 return mapper.convert(colander_feed, root_entity)