Source code for colander_data_converter.converters.threatr.models

  1from datetime import datetime, UTC
  2from typing import Optional, Dict, Any, List, Union, get_args
  3from uuid import uuid4, UUID
  4
  5from pydantic import Field, BaseModel, model_validator, ConfigDict
  6from pydantic.types import UUID4, PositiveInt
  7
  8from colander_data_converter.base.common import (
  9    TlpPapLevel,
 10    ObjectReference,
 11    Singleton,
 12)
 13from colander_data_converter.base.models import CommonEntitySuperType, CommonEntitySuperTypes
 14from colander_data_converter.base.types.base import CommonEntityType
 15
 16
[docs] 17class ThreatrRepository(object, metaclass=Singleton): 18 """Singleton repository for managing and storing Entity, Event, and EntityRelation objects. 19 20 This class provides centralized storage and reference management for all model instances, 21 supporting insertion, lookup, and reference resolution/unlinking. Uses the Singleton 22 pattern to ensure a single global repository instance. 23 24 Warning: 25 As a singleton, this repository persists for the entire application lifecycle. 26 Use the ``clear()`` method to reset state when needed. 27 """ 28 29 entities: Dict[str, "Entity"] 30 """Dictionary storing Entity objects by their string ID.""" 31 32 events: Dict[str, "Event"] 33 """Dictionary storing Event objects by their string ID.""" 34 35 relations: Dict[str, "EntityRelation"] 36 """Dictionary storing EntityRelation objects by their string ID.""" 37
[docs] 38 def __init__(self): 39 """Initializes the repository with empty dictionaries for events, entities, and relations. 40 41 Note: 42 Due to the Singleton pattern, this method is only called once per application run. 43 """ 44 self.events = {} 45 self.entities = {} 46 self.relations = {}
47
[docs] 48 def clear(self): 49 """Clears all stored entities, events, and relations from the repository. 50 51 Caution: 52 This operation cannot be undone and will remove all data from the repository. 53 """ 54 self.events.clear() 55 self.relations.clear() 56 self.entities.clear()
57
[docs] 58 def __lshift__(self, other: Union["Entity", "Event", "EntityRelation"]) -> None: 59 """Inserts an object into the appropriate repository dictionary using the left shift operator. 60 61 This method overloads the ``<<`` operator to provide a convenient way to register 62 Entity, Event, and EntityRelation objects in their respective dictionaries. 63 The object's ID is used as the key, converted to string format for consistency. 64 65 Args: 66 other: The object to insert into the repository. 67 """ 68 if isinstance(other, Entity): 69 self.entities[str(other.id)] = other 70 elif isinstance(other, EntityRelation): 71 self.relations[str(other.id)] = other 72 elif isinstance(other, Event): 73 self.events[str(other.id)] = other
74
[docs] 75 def __rshift__(self, other: str | UUID4) -> Union["Entity", "Event", "EntityRelation", str, UUID4]: 76 """Retrieves an object by its string or UUID identifier using the right shift operator. 77 78 This method overloads the ``>>`` operator to provide a convenient way to lookup 79 Entity, Event, and EntityRelation objects from their respective dictionaries. 80 The method searches through entities, relations, and events in that order, 81 returning the first match found. 82 83 Args: 84 other: The string or UUID identifier to look up in the repository. 85 86 Returns: 87 The found Entity, Event, or EntityRelation object, or the original 88 identifier if no matching object is found. 89 """ 90 _other = str(other) 91 if _other in self.entities: 92 return self.entities[_other] 93 elif _other in self.relations: 94 return self.relations[_other] 95 elif _other in self.events: 96 return self.events[_other] 97 return other
98 112
[docs] 113 def resolve_references(self, strict=False): 114 """Resolves all UUID references in relations and events to their corresponding objects. 115 116 This method iterates through all stored relations and events in the repository, 117 calling their respective ``resolve_references`` methods to convert UUID references 118 back to actual object instances. This is typically used after deserialization 119 to restore object relationships. 120 121 Args: 122 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 123 If False, unresolved references remain as UUIDs. Defaults to False. 124 125 Raises: 126 ValueError: If strict is True and any UUID reference cannot be resolved 127 to an existing object in the repository. 128 129 Important: 130 Use ``strict=True`` to ensure data integrity when all references must be resolvable. 131 """ 132 for _, relation in self.relations.items(): 133 relation.resolve_references(strict=strict) 134 for _, event in self.events.items(): 135 event.resolve_references(strict=strict)
136 137
[docs] 138class ThreatrType(BaseModel): 139 """Base model for Threatr objects, providing repository registration and reference management. 140 141 This class ensures that all subclasses are automatically registered in the ThreatrRepository 142 and provides methods to unlink and resolve object references for serialization and 143 deserialization workflows. 144 145 Important: 146 All Threatr model classes must inherit from this base class to ensure proper 147 repository integration and reference management. 148 """ 149 150 model_config = ConfigDict( 151 str_strip_whitespace=True, 152 arbitrary_types_allowed=True, 153 ) 154
[docs] 155 def model_post_init(self, __context): 156 """Executes post-initialization logic for the model. 157 158 Ensures the repository registers the current subclass instance automatically 159 after object creation. 160 161 Args: 162 __context: Additional context provided for post-initialization handling. 163 164 Note: 165 This method is called automatically by Pydantic after model initialization. 166 """ 167 _ = ThreatrRepository() 168 _ << self
169 170 def _process_reference_fields(self, operation, strict=False): 171 """Helper method to process reference fields for both unlinking and resolving operations. 172 173 Args: 174 operation: The operation to perform, either 'unlink' or 'resolve'. 175 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 176 Only used for 'resolve' operation. Defaults to False. 177 178 Raises: 179 ValueError: If strict is True and a UUID reference cannot be resolved. 180 AttributeError: If the class instance does not have the expected field or attribute. 181 """ 182 for field, info in self.__class__.model_fields.items(): 183 annotation_args = get_args(info.annotation) 184 if ObjectReference in annotation_args: 185 ref = getattr(self, field) 186 if operation == "unlink" and ref and type(ref) is not UUID: 187 setattr(self, field, ref.id) 188 elif operation == "resolve" and type(ref) is UUID: 189 x = ThreatrRepository() >> ref 190 if strict and isinstance(x, UUID): 191 raise ValueError(f"Unable to resolve UUID reference {x}") 192 setattr(self, field, x) 193 elif List[ObjectReference] in annotation_args: 194 refs = getattr(self, field) 195 new_refs = [] 196 _update = False 197 for ref in refs: 198 if operation == "unlink" and ref and type(ref) is not UUID: 199 new_refs.append(ref.id) 200 _update = True 201 elif operation == "resolve" and type(ref) is UUID: 202 x = ThreatrRepository() >> ref 203 if strict and isinstance(x, UUID): 204 raise ValueError(f"Unable to resolve UUID reference {x}") 205 new_refs.append(x) 206 _update = True 207 if _update: 208 setattr(self, field, new_refs) 209 223
[docs] 224 def resolve_references(self, strict=False): 225 """Resolves UUID references to their corresponding objects using the ThreatrRepository. 226 227 Fields annotated with ``ObjectReference`` or ``List[ObjectReference]`` are processed 228 to fetch and replace their UUID references with actual object instances. 229 230 Args: 231 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 232 If False, unresolved references remain as UUIDs. Defaults to False. 233 234 Raises: 235 ValueError: If strict is True and a UUID reference cannot be resolved. 236 237 Important: 238 Use ``strict=True`` to ensure all references are valid and resolvable. 239 """ 240 self._process_reference_fields("resolve", strict)
241 242
[docs] 243class Entity(ThreatrType): 244 """Represents an entity in the Threatr data model. 245 246 Entities are the primary data objects in Threatr, representing observables, 247 indicators, or other threat intelligence artifacts with associated metadata 248 and classification levels. 249 """ 250 251 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 252 """The unique identifier for the entity.""" 253 254 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 255 """The timestamp when the entity was created.""" 256 257 updated_at: datetime = Field(default=datetime.now(UTC)) 258 """The timestamp when the entity was last updated.""" 259 260 name: str = Field(..., min_length=1, max_length=512) 261 """The name of the entity.""" 262 263 type: CommonEntityType 264 """The specific type of the entity.""" 265 266 super_type: CommonEntitySuperType 267 """The super type classification of the entity.""" 268 269 description: str | None = None 270 """Optional description of the entity.""" 271 272 pap: TlpPapLevel = TlpPapLevel.WHITE 273 """The PAP (Permissible Actions Protocol) level for the entity.""" 274 275 source_url: str | None = None 276 """Optional source URL for the entity.""" 277 278 tlp: TlpPapLevel = TlpPapLevel.WHITE 279 """The TLP (Traffic Light Protocol) level for the entity.""" 280 281 attributes: Optional[Dict[str, str | None]] = None 282 """Dictionary of additional attributes."""
283 284
[docs] 285class EntityRelation(ThreatrType): 286 """Represents a relation between two entities in the Threatr data model. 287 288 EntityRelations define directed relationships between entities, supporting 289 complex threat intelligence graphs and entity associations. 290 """ 291 292 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 293 """The unique identifier for the entity relation.""" 294 295 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 296 """The timestamp when the entity relation was created.""" 297 298 updated_at: datetime = Field(default=datetime.now(UTC)) 299 """The timestamp when the entity relation was last updated.""" 300 301 name: str = Field(..., min_length=1, max_length=512) 302 """The name of the entity relation.""" 303 304 description: str | None = None 305 """Optional description of the relation.""" 306 307 attributes: Optional[Dict[str, str | None]] = None 308 """Dictionary of additional attributes for the relation.""" 309 310 obj_from: Entity | ObjectReference = Field(...) 311 """The source entity or reference in the relation.""" 312 313 obj_to: Entity | ObjectReference = Field(...) 314 """The target entity or reference in the relation."""
315 316
[docs] 317class Event(ThreatrType): 318 """Represents an event in the Threatr data model. 319 320 Events capture temporal occurrences related to threat intelligence, 321 tracking when specific activities or observations took place. 322 """ 323 324 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 325 """The unique identifier for the event.""" 326 327 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 328 """The timestamp when the event was created.""" 329 330 updated_at: datetime = Field(default=datetime.now(UTC)) 331 """The timestamp when the event was last updated.""" 332 333 name: str = Field(..., min_length=1, max_length=512) 334 """The name of the event.""" 335 336 description: str | None = None 337 """Optional description of the event.""" 338 339 attributes: Optional[Dict[str, str | None]] = None 340 """Dictionary of additional attributes for the event.""" 341 342 first_seen: datetime = datetime.now(UTC) 343 """The timestamp when the event was first observed.""" 344 345 last_seen: datetime = datetime.now(UTC) 346 """The timestamp when the event was last observed.""" 347 348 count: PositiveInt = 1 349 """The number of times this event was observed.""" 350 351 type: CommonEntityType 352 """The type of the event.""" 353 354 super_type: CommonEntitySuperType = CommonEntitySuperTypes.EVENT 355 """The super type classification of the event.""" 356 357 involved_entity: Optional[Entity] | Optional[ObjectReference] = None 358 """Optional entity or reference involved in this event.""" 359 360 @model_validator(mode="after") 361 def _check_dates(self) -> Any: 362 """Validates that the first_seen date is before the last_seen date. 363 364 Returns: 365 The validated model instance. 366 367 Raises: 368 ValueError: If first_seen is after last_seen. 369 370 Important: 371 This validation ensures temporal consistency for event data. 372 """ 373 if self.first_seen > self.last_seen: 374 raise ValueError("first_seen must be before last_seen") 375 return self
376 377
[docs] 378class ThreatrFeed(ThreatrType): 379 """Represents a feed of Threatr data, including entities, relations, and events. 380 381 ThreatrFeed serves as a container for complete threat intelligence datasets, 382 organizing related entities, their relationships, and associated events into 383 a cohesive data structure. 384 """ 385 386 root_entity: Entity 387 """The root entity of the feed, corresponding to the primary requested entity.""" 388 389 entities: Optional[List[Entity]] = [] 390 """List of entity objects in the feed.""" 391 392 relations: Optional[List[EntityRelation]] = [] 393 """List of entity relation objects in the feed.""" 394 395 events: Optional[List[Event]] = [] 396 """List of event objects in the feed.""" 397
[docs] 398 @staticmethod 399 def load( 400 raw_object: Dict[str, Union[Entity, Event, EntityRelation]], 401 strict: bool = False, 402 ) -> "ThreatrFeed": 403 """Loads a ThreatrFeed from a raw object dictionary, resolving references. 404 405 Args: 406 raw_object: The raw data to validate and load. 407 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 408 If False, unresolved references remain as UUIDs. 409 410 Returns: 411 The loaded and reference-resolved feed. 412 413 Important: 414 Use ``strict=True`` to ensure all references in the feed are valid and resolvable. 415 """ 416 feed = ThreatrFeed.model_validate(raw_object) 417 feed.resolve_references(strict=strict) 418 return feed
419
[docs] 420 def resolve_references(self, strict=False): 421 """Resolves references within entities, relations, and events. 422 423 Iterates over each entity, relation, and event within the respective collections, 424 calling their ``resolve_references`` method to update them with any referenced data. 425 426 Args: 427 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 428 If False, unresolved references remain as UUIDs. 429 430 Note: 431 This method synchronizes internal state with external dependencies after loading. 432 """ 433 for entity in self.entities: 434 entity.resolve_references(strict=strict) 435 for event in self.events: 436 event.resolve_references(strict=strict) 437 for relation in self.relations: 438 relation.resolve_references(strict=strict)
439