Source code for colander_data_converter.converters.threatr.models

  1from datetime import datetime, UTC
  2from typing import Optional, Dict, Any, List, Union, get_args
  3from uuid import uuid4, UUID
  4
  5from pydantic import Field, BaseModel, model_validator, ConfigDict
  6from pydantic.types import UUID4, PositiveInt
  7
  8from colander_data_converter.base.common import (
  9    TlpPapLevel,
 10    ObjectReference,
 11    Singleton,
 12    LRUDict,
 13)
 14from colander_data_converter.base.models import CommonEntitySuperType, CommonEntitySuperTypes
 15from colander_data_converter.base.types.base import CommonEntityType
 16
 17
[docs] 18class ThreatrRepository(object, metaclass=Singleton): 19 """Singleton repository for managing and storing Entity, Event, and EntityRelation objects. 20 21 This class provides centralized storage and reference management for all model instances, 22 supporting insertion, lookup, and reference resolution/unlinking. Uses the Singleton 23 pattern to ensure a single global repository instance. 24 25 Warning: 26 As a singleton, this repository persists for the entire application lifecycle. 27 Use the ``clear()`` method to reset state when needed. 28 """ 29 30 entities: Dict[str, "Entity"] 31 """Dictionary storing Entity objects by their string ID.""" 32 33 events: Dict[str, "Event"] 34 """Dictionary storing Event objects by their string ID.""" 35 36 relations: Dict[str, "EntityRelation"] 37 """Dictionary storing EntityRelation objects by their string ID.""" 38
[docs] 39 def __init__(self): 40 """Initializes the repository with empty dictionaries for events, entities, and relations. 41 42 Note: 43 Due to the Singleton pattern, this method is only called once per application run. 44 """ 45 self.events = LRUDict() 46 self.entities = LRUDict() 47 self.relations = LRUDict()
48
[docs] 49 def clear(self): 50 """Clears all stored entities, events, and relations from the repository. 51 52 Caution: 53 This operation cannot be undone and will remove all data from the repository. 54 """ 55 self.events.clear() 56 self.relations.clear() 57 self.entities.clear()
58
[docs] 59 def __lshift__(self, other: Union["Entity", "Event", "EntityRelation"]) -> None: 60 """Inserts an object into the appropriate repository dictionary using the left shift operator. 61 62 This method overloads the ``<<`` operator to provide a convenient way to register 63 Entity, Event, and EntityRelation objects in their respective dictionaries. 64 The object's ID is used as the key, converted to string format for consistency. 65 66 Args: 67 other: The object to insert into the repository. 68 """ 69 if isinstance(other, Entity): 70 self.entities[str(other.id)] = other 71 elif isinstance(other, EntityRelation): 72 self.relations[str(other.id)] = other 73 elif isinstance(other, Event): 74 self.events[str(other.id)] = other
75
[docs] 76 def __rshift__(self, other: str | UUID4) -> Union["Entity", "Event", "EntityRelation", str, UUID4]: 77 """Retrieves an object by its string or UUID identifier using the right shift operator. 78 79 This method overloads the ``>>`` operator to provide a convenient way to lookup 80 Entity, Event, and EntityRelation objects from their respective dictionaries. 81 The method searches through entities, relations, and events in that order, 82 returning the first match found. 83 84 Args: 85 other: The string or UUID identifier to look up in the repository. 86 87 Returns: 88 The found Entity, Event, or EntityRelation object, or the original 89 identifier if no matching object is found. 90 """ 91 _other = str(other) 92 if _other in self.entities: 93 return self.entities[_other] 94 elif _other in self.relations: 95 return self.relations[_other] 96 elif _other in self.events: 97 return self.events[_other] 98 return other
99 113
[docs] 114 def resolve_references(self, strict=False): 115 """Resolves all UUID references in relations and events to their corresponding objects. 116 117 This method iterates through all stored relations and events in the repository, 118 calling their respective ``resolve_references`` methods to convert UUID references 119 back to actual object instances. This is typically used after deserialization 120 to restore object relationships. 121 122 Args: 123 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 124 If False, unresolved references remain as UUIDs. Defaults to False. 125 126 Raises: 127 ValueError: If strict is True and any UUID reference cannot be resolved 128 to an existing object in the repository. 129 130 Important: 131 Use ``strict=True`` to ensure data integrity when all references must be resolvable. 132 """ 133 for _, relation in self.relations.items(): 134 relation.resolve_references(strict=strict) 135 for _, event in self.events.items(): 136 event.resolve_references(strict=strict)
137 138
[docs] 139class ThreatrType(BaseModel): 140 """Base model for Threatr objects, providing repository registration and reference management. 141 142 This class ensures that all subclasses are automatically registered in the ThreatrRepository 143 and provides methods to unlink and resolve object references for serialization and 144 deserialization workflows. 145 146 Important: 147 All Threatr model classes must inherit from this base class to ensure proper 148 repository integration and reference management. 149 """ 150 151 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True) 152
[docs] 153 def model_post_init(self, __context): 154 """Executes post-initialization logic for the model. 155 156 Ensures the repository registers the current subclass instance automatically 157 after object creation. 158 159 Args: 160 __context: Additional context provided for post-initialization handling. 161 162 Note: 163 This method is called automatically by Pydantic after model initialization. 164 """ 165 _ = ThreatrRepository() 166 _ << self
167 168 def _process_reference_fields(self, operation, strict=False): 169 """Helper method to process reference fields for both unlinking and resolving operations. 170 171 Args: 172 operation: The operation to perform, either 'unlink' or 'resolve'. 173 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 174 Only used for 'resolve' operation. Defaults to False. 175 176 Raises: 177 ValueError: If strict is True and a UUID reference cannot be resolved. 178 AttributeError: If the class instance does not have the expected field or attribute. 179 """ 180 for field, info in self.__class__.model_fields.items(): 181 annotation_args = get_args(info.annotation) 182 if ObjectReference in annotation_args: 183 ref = getattr(self, field) 184 if operation == "unlink" and ref and type(ref) is not UUID: 185 setattr(self, field, ref.id) 186 elif operation == "resolve" and type(ref) is UUID: 187 x = ThreatrRepository() >> ref 188 if strict and isinstance(x, UUID): 189 raise ValueError(f"Unable to resolve UUID reference {x}") 190 setattr(self, field, x) 191 elif List[ObjectReference] in annotation_args: 192 refs = getattr(self, field) 193 new_refs = [] 194 _update = False 195 for ref in refs: 196 if operation == "unlink" and ref and type(ref) is not UUID: 197 new_refs.append(ref.id) 198 _update = True 199 elif operation == "resolve" and type(ref) is UUID: 200 x = ThreatrRepository() >> ref 201 if strict and isinstance(x, UUID): 202 raise ValueError(f"Unable to resolve UUID reference {x}") 203 new_refs.append(x) 204 _update = True 205 if _update: 206 setattr(self, field, new_refs) 207 221
[docs] 222 def resolve_references(self, strict=False): 223 """Resolves UUID references to their corresponding objects using the ThreatrRepository. 224 225 Fields annotated with ``ObjectReference`` or ``List[ObjectReference]`` are processed 226 to fetch and replace their UUID references with actual object instances. 227 228 Args: 229 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 230 If False, unresolved references remain as UUIDs. Defaults to False. 231 232 Raises: 233 ValueError: If strict is True and a UUID reference cannot be resolved. 234 235 Important: 236 Use ``strict=True`` to ensure all references are valid and resolvable. 237 """ 238 self._process_reference_fields("resolve", strict)
239 240
[docs] 241class Entity(ThreatrType): 242 """Represents an entity in the Threatr data model. 243 244 Entities are the primary data objects in Threatr, representing observables, 245 indicators, or other threat intelligence artifacts with associated metadata 246 and classification levels. 247 """ 248 249 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 250 """The unique identifier for the entity.""" 251 252 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 253 """The timestamp when the entity was created.""" 254 255 updated_at: datetime = Field(default=datetime.now(UTC)) 256 """The timestamp when the entity was last updated.""" 257 258 name: str = Field(..., min_length=1, max_length=512) 259 """The name of the entity.""" 260 261 type: CommonEntityType 262 """The specific type of the entity.""" 263 264 super_type: CommonEntitySuperType 265 """The super type classification of the entity.""" 266 267 description: str | None = None 268 """Optional description of the entity.""" 269 270 pap: TlpPapLevel = TlpPapLevel.WHITE 271 """The PAP (Permissible Actions Protocol) level for the entity.""" 272 273 source_url: str | None = None 274 """Optional source URL for the entity.""" 275 276 tlp: TlpPapLevel = TlpPapLevel.WHITE 277 """The TLP (Traffic Light Protocol) level for the entity.""" 278 279 attributes: Optional[Dict[str, str | None]] = None 280 """Dictionary of additional attributes."""
281 282
[docs] 283class EntityRelation(ThreatrType): 284 """Represents a relation between two entities in the Threatr data model. 285 286 EntityRelations define directed relationships between entities, supporting 287 complex threat intelligence graphs and entity associations. 288 """ 289 290 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 291 """The unique identifier for the entity relation.""" 292 293 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 294 """The timestamp when the entity relation was created.""" 295 296 updated_at: datetime = Field(default=datetime.now(UTC)) 297 """The timestamp when the entity relation was last updated.""" 298 299 name: str = Field(..., min_length=1, max_length=512) 300 """The name of the entity relation.""" 301 302 description: str | None = None 303 """Optional description of the relation.""" 304 305 attributes: Optional[Dict[str, str | None]] = None 306 """Dictionary of additional attributes for the relation.""" 307 308 obj_from: Entity | ObjectReference = Field(...) 309 """The source entity or reference in the relation.""" 310 311 obj_to: Entity | ObjectReference = Field(...) 312 """The target entity or reference in the relation."""
313 314
[docs] 315class Event(ThreatrType): 316 """Represents an event in the Threatr data model. 317 318 Events capture temporal occurrences related to threat intelligence, 319 tracking when specific activities or observations took place. 320 """ 321 322 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 323 """The unique identifier for the event.""" 324 325 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 326 """The timestamp when the event was created.""" 327 328 updated_at: datetime = Field(default=datetime.now(UTC)) 329 """The timestamp when the event was last updated.""" 330 331 name: str = Field(..., min_length=1, max_length=512) 332 """The name of the event.""" 333 334 description: str | None = None 335 """Optional description of the event.""" 336 337 attributes: Optional[Dict[str, str | None]] = None 338 """Dictionary of additional attributes for the event.""" 339 340 first_seen: datetime = datetime.now(UTC) 341 """The timestamp when the event was first observed.""" 342 343 last_seen: datetime = datetime.now(UTC) 344 """The timestamp when the event was last observed.""" 345 346 count: PositiveInt = 1 347 """The number of times this event was observed.""" 348 349 type: CommonEntityType 350 """The type of the event.""" 351 352 super_type: CommonEntitySuperType = CommonEntitySuperTypes.EVENT 353 """The super type classification of the event.""" 354 355 involved_entity: Optional[Entity] | Optional[ObjectReference] = None 356 """Optional entity or reference involved in this event.""" 357 358 @model_validator(mode="after") 359 def _check_dates(self) -> Any: 360 """Validates that the first_seen date is before the last_seen date. 361 362 Returns: 363 The validated model instance. 364 365 Raises: 366 ValueError: If first_seen is after last_seen. 367 368 Important: 369 This validation ensures temporal consistency for event data. 370 """ 371 if self.first_seen > self.last_seen: 372 raise ValueError("first_seen must be before last_seen") 373 return self
374 375
[docs] 376class ThreatrFeed(ThreatrType): 377 """Represents a feed of Threatr data, including entities, relations, and events. 378 379 ThreatrFeed serves as a container for complete threat intelligence datasets, 380 organizing related entities, their relationships, and associated events into 381 a cohesive data structure. 382 """ 383 384 root_entity: Entity 385 """The root entity of the feed, corresponding to the primary requested entity.""" 386 387 entities: Optional[List[Entity]] = [] 388 """List of entity objects in the feed.""" 389 390 relations: Optional[List[EntityRelation]] = [] 391 """List of entity relation objects in the feed.""" 392 393 events: Optional[List[Event]] = [] 394 """List of event objects in the feed.""" 395
[docs] 396 @staticmethod 397 def load( 398 raw_object: Dict[str, Union[Entity, Event, EntityRelation]], 399 strict: bool = False, 400 ) -> "ThreatrFeed": 401 """Loads a ThreatrFeed from a raw object dictionary, resolving references. 402 403 Args: 404 raw_object: The raw data to validate and load. 405 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 406 If False, unresolved references remain as UUIDs. 407 408 Returns: 409 The loaded and reference-resolved feed. 410 411 Important: 412 Use ``strict=True`` to ensure all references in the feed are valid and resolvable. 413 """ 414 ThreatrRepository().clear() 415 feed = ThreatrFeed.model_validate(raw_object) 416 feed.resolve_references(strict=strict) 417 return feed
418
[docs] 419 def resolve_references(self, strict=False): 420 """Resolves references within entities, relations, and events. 421 422 Iterates over each entity, relation, and event within the respective collections, 423 calling their ``resolve_references`` method to update them with any referenced data. 424 425 Args: 426 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 427 If False, unresolved references remain as UUIDs. 428 429 Note: 430 This method synchronizes internal state with external dependencies after loading. 431 """ 432 for entity in self.entities: 433 entity.resolve_references(strict=strict) 434 for event in self.events: 435 event.resolve_references(strict=strict) 436 for relation in self.relations: 437 relation.resolve_references(strict=strict)
438