1from datetime import datetime, UTC
2from typing import Union, List, get_args, cast, Optional
3from uuid import uuid4, UUID
4
5from pydantic import UUID4
6
7from colander_data_converter.base.common import ObjectReference
8from colander_data_converter.base.models import (
9 ColanderFeed,
10 EntityTypes,
11 EntityRelation as ColanderEntityRelation,
12 Entity as ColanderEntity,
13 Event,
14 CommonEntitySuperType,
15 ColanderRepository,
16 CommonEntitySuperTypes,
17 Observable,
18)
19from colander_data_converter.base.types.base import CommonEntityType
20from colander_data_converter.base.types.event import EventTypes
21from colander_data_converter.base.utils import BaseModelMerger
22from colander_data_converter.converters.threatr.mapping import ThreatrMapper
23from colander_data_converter.converters.threatr.models import (
24 ThreatrFeed,
25 Entity as ThreatrEntity,
26 Event as ThreatrEvent,
27 EntityRelation as ThreatrEntityRelation,
28)
29
30
[docs]
31class ColanderToThreatrMapper(ThreatrMapper):
32 """
33 Mapper for converting Colander data model to Threatr data model.
34
35 This class handles the conversion of Colander feeds, entities, relations, and events
36 to their corresponding Threatr equivalents. It processes reference fields and creates
37 appropriate relationship mappings between entities.
38
39 Note:
40 The mapper uses the mapping configuration loaded from the parent ThreatrMapper
41 class to determine appropriate field and relation name mappings.
42 """
43
44 def _get_relation_name_from_field(self, source_type: str, target_type: str, field_name: str) -> str:
45 """
46 Get the relation name for a field based on the mapping configuration.
47
48 Args:
49 source_type (str): The source entity type name
50 target_type (str): The target entity type name
51 field_name (str): The field name to map
52
53 Returns:
54 str: The mapped relation name or a default based on the field name
55
56 Note:
57 If no mapping is found in the configuration, returns a normalized
58 version of the field name with underscores replaced by spaces.
59 """
60 assert source_type is not None
61 assert target_type is not None
62 assert field_name is not None
63
64 relation_name = field_name.lower().replace("_", " ")
65 for mapping in self.mapping_loader.mapping_data:
66 if (
67 mapping["source_type"] == source_type.lower()
68 and mapping["target_type"] == target_type.lower()
69 and field_name in mapping["fields"]
70 ):
71 relation_name = mapping["fields"][field_name]
72
73 return relation_name
74
[docs]
75 def convert(self, colander_feed: ColanderFeed, root_entity: Union[str, UUID4, EntityTypes]) -> ThreatrFeed:
76 """
77 Convert a Colander data model to a Threatr data model.
78
79 This method transforms a complete Colander feed including all entities, relations,
80 and events into the equivalent Threatr representation. It handles reference field
81 extraction and conversion to explicit relations.
82
83 Args:
84 colander_feed (ColanderFeed): The Colander feed to convert
85 root_entity (Union[str, UUID4, EntityTypes]): The root entity ID, UUID, or entity object to use as the root
86
87 Returns:
88 ThreatrFeed: A ThreatrFeed object containing the converted data
89
90 Raises:
91 ValueError: If the root entity cannot be found or is invalid
92
93 Important:
94 The root entity must exist in the provided Colander feed. If a string ID
95 is provided, it must be a valid UUID format.
96 """
97 # Get the root entity object if an ID was provided
98 root_entity_obj = None
99 if isinstance(root_entity, str):
100 try:
101 root_entity = UUID(root_entity, version=4)
102 except Exception:
103 raise ValueError(f"Invalid UUID {root_entity}")
104 if isinstance(root_entity, UUID):
105 root_entity_obj = colander_feed.entities.get(str(root_entity))
106 if not root_entity_obj:
107 raise ValueError(f"Root entity with ID {root_entity} not found in feed")
108 else:
109 root_entity_obj = root_entity
110
111 # Convert the root entity to a Threatr entity
112 threatr_root_entity = self._convert_entity(root_entity_obj)
113 threatr_events = []
114
115 # Convert all entities
116 threatr_entities = [threatr_root_entity]
117 for entity_id, entity in colander_feed.entities.items():
118 # Skip the root entity as it's already included
119 if str(entity.id) == str(root_entity_obj.id):
120 continue
121 threatr_entity = self._convert_entity(entity)
122 if isinstance(threatr_entity, ThreatrEvent):
123 threatr_events.append(threatr_entity)
124 else:
125 threatr_entities.append(threatr_entity)
126
127 # Convert all relations
128 threatr_relations = []
129 for relation_id, relation in colander_feed.relations.items():
130 threatr_relation = self._convert_relation(relation)
131 threatr_relations.append(threatr_relation)
132
133 # Convert reference fields to relations
134 reference_relations = self._extract_reference_relations(colander_feed)
135 threatr_relations.extend(reference_relations)
136
137 # Create and return the Threatr feed
138 return ThreatrFeed(
139 root_entity=threatr_root_entity,
140 entities=threatr_entities,
141 relations=threatr_relations,
142 events=threatr_events,
143 )
144
145 def _convert_entity(self, entity: ColanderEntity) -> Union[ThreatrEntity, ThreatrEvent]:
146 """
147 Convert a Colander entity to a Threatr entity or event.
148
149 Args:
150 entity (ColanderEntity): The Colander entity to convert
151
152 Returns:
153 Union[ThreatrEntity, ThreatrEvent]: A Threatr entity or event based on the input type
154
155 Note:
156 Events are detected by checking if the entity is an instance of the Event class
157 and are converted to ThreatrEvent objects accordingly.
158 """
159 # Create a base entity with common fields
160 model_class = ThreatrEntity
161 if isinstance(entity, Event):
162 model_class = ThreatrEvent
163 threatr_entity = model_class(
164 id=entity.id,
165 created_at=getattr(entity, "created_at", datetime.now(UTC)),
166 updated_at=getattr(entity, "updated_at", datetime.now(UTC)),
167 name=entity.name,
168 type=cast(CommonEntityType, entity.type),
169 super_type=cast(CommonEntitySuperType, entity.super_type),
170 attributes={},
171 )
172
173 bm = BaseModelMerger()
174 bm.merge(entity, threatr_entity)
175
176 return threatr_entity
177
178 def _convert_relation(self, relation: ColanderEntityRelation) -> ThreatrEntityRelation:
179 """
180 Convert a Colander entity relation to a Threatr entity relation.
181
182 Args:
183 relation (ColanderEntityRelation): The Colander entity relation to convert
184
185 Returns:
186 ThreatrEntityRelation: A Threatr entity relation
187
188 Note:
189 Object references are normalized to UUIDs during conversion to maintain
190 consistency in the Threatr model.
191 """
192 # Create a base relation with common fields
193 threatr_relation = ThreatrEntityRelation(
194 id=relation.id,
195 created_at=getattr(relation, "created_at", datetime.now(UTC)),
196 updated_at=getattr(relation, "updated_at", datetime.now(UTC)),
197 name=relation.name,
198 description=getattr(relation, "description", None),
199 obj_from=relation.obj_from if isinstance(relation.obj_from, UUID) else relation.obj_from.id,
200 obj_to=relation.obj_to if isinstance(relation.obj_to, UUID) else relation.obj_to.id,
201 attributes={},
202 )
203
204 bm = BaseModelMerger()
205 bm.merge(relation, threatr_relation)
206
207 return threatr_relation
208
209 def _extract_reference_relations(self, colander_feed: ColanderFeed) -> List[ThreatrEntityRelation]:
210 """
211 Extract reference fields from Colander entities and convert them to Threatr relations.
212
213 This method processes all entities in the feed to identify ObjectReference fields
214 and converts them into explicit EntityRelation objects in the Threatr model.
215
216 Args:
217 colander_feed (ColanderFeed): The Colander feed containing entities
218
219 Returns:
220 List[ThreatrEntityRelation]: A list of Threatr entity relations extracted from reference fields
221
222 Note:
223 Both single ObjectReference fields and List[ObjectReference] fields are processed
224 to create appropriate relationship mappings.
225 """
226 relations = []
227
228 for entity_id, entity in colander_feed.entities.items():
229 entity_type_name = type(entity).__name__.lower()
230
231 for field_name, field_info in entity.__class__.model_fields.items():
232 field_annotation = get_args(field_info.annotation)
233 field_value = getattr(entity, field_name, None)
234
235 if not field_value or not field_annotation:
236 continue
237
238 # Handle single ObjectReference
239 if ObjectReference in field_annotation:
240 relation = self._create_relation_from_reference(
241 entity, field_name, field_value, entity_type_name, colander_feed, is_list=False
242 )
243 if relation:
244 relations.append(relation)
245
246 # Handle List[ObjectReference]
247 elif List[ObjectReference] in field_annotation:
248 for object_reference in field_value:
249 relation = self._create_relation_from_reference(
250 entity, field_name, object_reference, entity_type_name, colander_feed, is_list=True
251 )
252 if relation:
253 relations.append(relation)
254
255 return relations
256
257 def _create_relation_from_reference(
258 self, entity, field_name, reference_value, entity_type_name, colander_feed, is_list=False
259 ):
260 """
261 Helper method to create a relation from a reference field.
262
263 Args:
264 entity: The source entity containing the reference
265 field_name (str): The name of the reference field
266 reference_value: The reference value (UUID or object)
267 entity_type_name (str): The source entity type name
268 colander_feed (ColanderFeed): The feed containing target entities
269 is_list (bool, optional): Whether the reference comes from a list field. Defaults to False.
270
271 Returns:
272 ThreatrEntityRelation | None: A new ThreatrEntityRelation or None if target not found
273 """
274 target_id = reference_value if isinstance(reference_value, UUID) else reference_value.id
275 target_entity = colander_feed.entities.get(str(target_id))
276
277 if not target_entity:
278 return None
279
280 target_entity_type_name = type(target_entity).__name__.lower()
281
282 # Get relation name based on whether it's a list or single reference
283 if is_list:
284 relation_name = self._get_relation_name_from_field(entity_type_name, target_entity_type_name, field_name)
285 else:
286 relation_name = field_name.replace("_", " ")
287
288 return ThreatrEntityRelation(
289 id=uuid4(),
290 created_at=datetime.now(UTC),
291 updated_at=datetime.now(UTC),
292 name=relation_name,
293 description=f"Relation extracted from {entity_type_name}.{field_name} reference to {target_entity_type_name}",
294 obj_from=entity.id,
295 obj_to=target_entity.id,
296 attributes={},
297 )
298
299
[docs]
300class ThreatrToColanderMapper(ThreatrMapper):
301 """
302 Mapper for converting Threatr data model to Colander data model.
303
304 This class handles the conversion of Threatr feeds, entities, events, and relations
305 to their corresponding Colander equivalents. It processes explicit relations and
306 attempts to convert them back to reference fields where appropriate.
307
308 Important:
309 This mapper maintains state during conversion, storing the input ThreatrFeed
310 and building the output ColanderFeed incrementally.
311
312 Attributes:
313 threatr_feed (Optional[ThreatrFeed]): The input Threatr feed being converted
314 colander_feed (ColanderFeed): The output Colander feed being built
315 """
316
[docs]
317 def __init__(self):
318 """
319 Initialize the mapper with empty feed containers.
320
321 Note:
322 The mapper creates a new ColanderFeed instance for each conversion process.
323 """
324 super().__init__()
325 self.threatr_feed: Optional[ThreatrFeed] = None
326 self.colander_feed: ColanderFeed = ColanderFeed()
327
328 def _get_field_from_relation_name(self, source_type: str, target_type: str, relation_name: str) -> Optional[str]:
329 """
330 Get the field name for a relation based on the mapping configuration.
331
332 Args:
333 source_type (str): The source entity type name
334 target_type (str): The target entity type name
335 relation_name (str): The relation name to reverse-map
336
337 Returns:
338 Optional[str]: The corresponding field name or None if no mapping found
339
340 Note:
341 This method performs reverse lookup in the mapping configuration
342 to find field names that correspond to relation names.
343 """
344 assert source_type is not None
345 assert target_type is not None
346 assert relation_name is not None
347 relation_name = relation_name.lower().replace("_", " ")
348 for mapping in self.mapping_loader.mapping_data:
349 if mapping["source_type"] == source_type.lower() and mapping["target_type"] == target_type.lower():
350 for fn, rn in mapping["fields"].items():
351 if rn == relation_name:
352 return fn
353 return None
354
355 def _create_immutable_relation(self, threatr_relation: ThreatrEntityRelation) -> bool:
356 """
357 Attempt to convert a Threatr relation back to a reference field.
358
359 This method tries to convert explicit relations back into reference fields
360 on the source entity, which is the preferred representation in the Colander model.
361
362 Args:
363 threatr_relation (ThreatrEntityRelation): The Threatr relation to convert
364
365 Returns:
366 bool: True if the relation was successfully converted to a reference field
367
368 Important:
369 Only relations that map to known reference fields can be converted.
370 Other relations remain as explicit EntityRelation objects.
371 """
372 relation_name = threatr_relation.name
373 source_entity_id: UUID4 = (
374 threatr_relation.obj_from if isinstance(threatr_relation.obj_from, UUID) else threatr_relation.obj_from.id
375 )
376 target_entity_id: UUID4 = (
377 threatr_relation.obj_to if isinstance(threatr_relation.obj_to, UUID) else threatr_relation.obj_to.id
378 )
379
380 source_entity = ColanderRepository() >> source_entity_id
381 target_entity = ColanderRepository() >> target_entity_id
382
383 # Ensure both source and target entities are valid Colander entities
384 if not isinstance(source_entity, ColanderEntity) or not isinstance(target_entity, ColanderEntity):
385 return False
386
387 if (
388 field_name := self._get_field_from_relation_name(
389 source_entity.super_type.short_name, target_entity.super_type.short_name, relation_name
390 )
391 ) is not None and field_name in source_entity.__class__.model_fields.keys():
392 setattr(source_entity, field_name, target_entity)
393 return True
394
395 return False
396
397 def _convert_relation(self, relation: ThreatrEntityRelation) -> Optional[ColanderEntityRelation]:
398 """
399 Convert a Threatr entity relation to a Colander entity relation.
400
401 Args:
402 relation (ThreatrEntityRelation): The Threatr entity relation to convert
403
404 Returns:
405 Optional[ColanderEntityRelation]: A Colander entity relation or None if conversion fails
406
407 Raises:
408 AssertionError: If relation or its object references are None
409
410 Note:
411 This method checks if the relation has already been converted to avoid
412 duplicate processing.
413 """
414 assert relation is not None
415 assert relation.obj_from is not None
416 assert relation.obj_to is not None
417
418 colander_relation: ColanderEntityRelation = ColanderRepository() >> relation.id
419 if isinstance(colander_relation, ColanderEntityRelation):
420 return colander_relation
421
422 obj_from = ColanderRepository() >> relation.obj_from.id
423 obj_to = ColanderRepository() >> relation.obj_to.id
424 if obj_from and obj_to:
425 colander_relation = ColanderEntityRelation(
426 id=relation.id,
427 name=relation.name,
428 created_at=relation.created_at,
429 updated_at=relation.updated_at,
430 obj_from=obj_from,
431 obj_to=obj_to,
432 )
433 bm = BaseModelMerger()
434 bm.merge(relation, colander_relation)
435 return colander_relation
436
437 return None
438
439 def _convert_entity(self, entity: ThreatrEntity) -> Optional[EntityTypes]:
440 """
441 Convert a Threatr entity to a Colander entity.
442
443 This method determines the appropriate Colander entity type based on the
444 Threatr entity's super_type and type, then creates the corresponding instance.
445
446 Args:
447 entity (ThreatrEntity): The Threatr entity to convert
448
449 Returns:
450 Optional[EntityTypes]: A Colander entity or None if conversion is not supported
451
452 Raises:
453 AssertionError: If entity is None or not a ThreatrEntity instance
454
455 Note:
456 Entities that have already been processed are returned from the repository
457 without re-conversion to maintain object identity.
458 """
459 assert entity is not None
460 assert isinstance(entity, ThreatrEntity)
461
462 # The entity has already been processed
463 if (colander_entity := ColanderRepository() >> entity.id) is not None and isinstance(
464 colander_entity, ColanderEntity
465 ):
466 return colander_entity
467
468 # The super type is not supported
469 if (super_type := CommonEntitySuperTypes.by_short_name(short_name=entity.super_type.short_name)) is None:
470 return None
471
472 # The entity type is not supported
473 if (sub_type := super_type.type_by_short_name(entity.type.short_name)) is None:
474 return None
475
476 colander_entity = super_type.model_class(
477 id=entity.id,
478 name=entity.name,
479 created_at=entity.created_at,
480 updated_at=entity.updated_at,
481 type=sub_type,
482 )
483 bm = BaseModelMerger()
484 bm.merge(entity, colander_entity, ignored_fields=["super_type"])
485 return colander_entity
486
487 def _convert_event(self, event: ThreatrEvent) -> Event:
488 """
489 Convert a Threatr event to a Colander event.
490
491 Args:
492 event (ThreatrEvent): The Threatr event to convert
493
494 Returns:
495 Event: A Colander event
496
497 Raises:
498 AssertionError: If event is None or not a ThreatrEvent instance
499
500 Note:
501 Events that have already been processed are returned from the repository
502 without re-conversion. Involved entities are automatically linked if they
503 are Observable instances.
504 """
505 assert event is not None
506 assert isinstance(event, ThreatrEvent)
507
508 # The event has already been processed
509 if (colander_event := ColanderRepository() >> event.id) is not None and isinstance(colander_event, Event):
510 return colander_event
511
512 sub_type = EventTypes.by_short_name(event.type.short_name)
513
514 colander_event = Event(
515 id=event.id,
516 name=event.name,
517 created_at=event.created_at,
518 updated_at=event.updated_at,
519 first_seen=event.first_seen,
520 last_seen=event.last_seen,
521 count=event.count,
522 type=sub_type,
523 )
524
525 if (involved_entity := event.involved_entity) is not None:
526 involved_entity = ColanderRepository() >> involved_entity.id
527 if isinstance(involved_entity, Observable):
528 colander_event.involved_observables.append(involved_entity)
529
530 bm = BaseModelMerger()
531 bm.merge(event, colander_event, ignored_fields=["involved_entity", "super_type"])
532 return colander_event
533
[docs]
534 def convert(self, threatr_feed: ThreatrFeed) -> ColanderFeed:
535 """
536 Convert a Threatr data model to a Colander data model.
537
538 This method performs a complete conversion of a ThreatrFeed to a ColanderFeed,
539 handling entities, events, and relations. It attempts to convert explicit
540 relations back to reference fields where possible.
541
542 Args:
543 threatr_feed (ThreatrFeed): The Threatr feed to convert
544
545 Returns:
546 ColanderFeed: A ColanderFeed object containing the converted data
547
548 Raises:
549 AssertionError: If threatr_feed is None or not a ThreatrFeed instance
550
551 Important:
552 The method resolves all references in the input feed before processing
553 to ensure consistent object relationships.
554 """
555 assert threatr_feed is not None
556 assert isinstance(threatr_feed, ThreatrFeed)
557 self.threatr_feed = threatr_feed
558 self.threatr_feed.resolve_references()
559 self.colander_feed.description = "Feed automatically generated from a Threatr feed."
560
561 if (root_entity := threatr_feed.root_entity) is not None:
562 if (colander_entity := self._convert_entity(root_entity)) is not None:
563 self.colander_feed.entities[str(root_entity.id)] = colander_entity
564
565 for entity in threatr_feed.entities or []:
566 if (colander_entity := self._convert_entity(entity)) is not None:
567 self.colander_feed.entities[str(entity.id)] = colander_entity
568
569 for event in threatr_feed.events or []:
570 if (colander_event := self._convert_event(event)) is not None:
571 self.colander_feed.entities[str(event.id)] = colander_event
572
573 for relation in threatr_feed.relations or []:
574 if not self._create_immutable_relation(relation):
575 if (colander_relation := self._convert_relation(relation)) is not None:
576 self.colander_feed.relations[str(relation.id)] = colander_relation
577
578 return self.colander_feed
579
580
[docs]
581class ThreatrConverter:
582 """
583 Converter for Threatr data to Colander data and vice versa.
584 Uses the mapping file to convert between formats.
585 """
586
[docs]
587 @staticmethod
588 def threatr_to_colander(threatr_feed: ThreatrFeed) -> ColanderFeed:
589 """
590 Converts Threatr data to Colander data using the mapping file.
591
592 Args:
593 threatr_feed (ThreatrFeed): The Threatr data to convert.
594
595 Returns:
596 ColanderFeed: The converted Colander data.
597 """
598 mapper = ThreatrToColanderMapper()
599 return mapper.convert(threatr_feed)
600
[docs]
601 @staticmethod
602 def colander_to_threatr(colander_feed: ColanderFeed, root_entity: Union[str, UUID4, EntityTypes]) -> ThreatrFeed:
603 """
604 Converts Colander data to Threatr data using the mapping file.
605
606 Args:
607 colander_feed (ColanderFeed): The Colander data to convert.
608 root_entity (Union[str, UUID4, EntityTypes]): The root entity ID, UUID, or entity object to use as the root
609
610 Returns:
611 ThreatrFeed: The converted Threatr data.
612 """
613 mapper = ColanderToThreatrMapper()
614 colander_feed.resolve_references()
615 return mapper.convert(colander_feed, root_entity)